diff options
author | Alex Shatov <alexs@att.com> | 2018-03-13 17:50:51 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-03-13 17:50:51 -0400 |
commit | a2b262994a62286e1c98fd9939115c6e64ed27ee (patch) | |
tree | f6ef7056c70a34485d1ed047a296b89bb3354ee2 /policyhandler | |
parent | b9b955c0c2875effc1ce02d5576f0d2a59284c5d (diff) |
2.3.0 policy-handler - periodically catch_up
- periodically catchup - interval is configurable
= max_skips defines the number of times the catch_up
message that is identical to previous one can be skipped
- do not catchup more often than the interval
even between the manual catchup and auto catchup
- do not send the same catchup message twice in a row
to the deployment-handler but not exceed a hard limit
on catchup max_skips
- catchup if the deployment-handler instance is changed
Change-Id: I9a3fcc941e8a9e553abb3952dd882c37e0f5fdfe
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-389
Diffstat (limited to 'policyhandler')
-rw-r--r-- | policyhandler/deploy_handler.py | 31 | ||||
-rw-r--r-- | policyhandler/policy_consts.py | 1 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 118 | ||||
-rw-r--r-- | policyhandler/policy_utils.py | 69 | ||||
-rw-r--r-- | policyhandler/step_timer.py | 63 |
5 files changed, 243 insertions, 39 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 0950dfd..493b7d5 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -41,6 +41,7 @@ class DeployHandler(object): _url_path = None _target_entity = None _custom_kwargs = None + _server_instance_uuid = None @staticmethod def _lazy_init(audit): @@ -72,7 +73,11 @@ class DeployHandler(object): @staticmethod def policy_update(audit, message): - """post policy_updated message to deploy-handler""" + """ + post policy_updated message to deploy-handler + + returns condition whether it needs to catch_up + """ if not message: return @@ -116,8 +121,24 @@ class DeployHandler(object): sub_aud.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) - sub_aud.metrics("response {0} from {1}: text={2}{3}" \ - .format(res.status_code, log_action, res.text, log_data)) + log_line = "response {0} from {1}: text={2}{3}" \ + .format(res.status_code, log_action, res.text, log_data) + sub_aud.metrics(log_line) + DeployHandler._logger.info(log_line) + + if res.status_code != requests.codes.ok: + return + + result = res.json() or {} + prev_server_instance_uuid = DeployHandler._server_instance_uuid + DeployHandler._server_instance_uuid = result.get("server_instance_uuid") + + need_to_catch_up = (prev_server_instance_uuid + and prev_server_instance_uuid != DeployHandler._server_instance_uuid) + if need_to_catch_up: + log_line = "need_to_catch_up: {1} != {0}" \ + .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid) + sub_aud.info(log_line) + DeployHandler._logger.info(log_line) - if res.status_code == requests.codes.ok: - return res.json() + return need_to_catch_up diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 013ed2a..90ede47 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -25,6 +25,7 @@ POLICY_BODY = 'policy_body' POLICY_CONFIG = 'config' CATCH_UP = "catch_up" +AUTO_CATCH_UP = "auto catch_up" LATEST_POLICIES = "latest_policies" REMOVED_POLICIES = "removed_policies" ERRORED_POLICIES = "errored_policies" diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 01be550..e12af88 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -18,15 +18,21 @@ """policy-updater thread""" +import copy import json import logging from Queue import Queue from threading import Lock, Thread +from .config import Config from .deploy_handler import DeployHandler from .onap.audit import Audit -from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES +from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES, + REMOVED_POLICIES) from .policy_rest import PolicyRest +from .policy_utils import Utils +from .step_timer import StepTimer + class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -37,9 +43,17 @@ class PolicyUpdater(Thread): Thread.__init__(self, name="policy_updater") self.daemon = True + self._catch_up_timer = None self._aud_shutdown = None self._aud_catch_up = None + catch_up_config = Config.config.get("catch_up", {}) + self._catch_up_interval = catch_up_config.get("interval") or 15*60 + self._catch_up_max_skips = catch_up_config.get("max_skips") or 3 + self._catch_up_skips = 0 + self._catch_up_prev_message = None + self._need_to_catch_up = False + self._lock = Lock() self._queue = Queue() @@ -57,7 +71,7 @@ class PolicyUpdater(Thread): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - audit, policies_updated, policies_removed = self._queue.get() + queued_audit, policies_updated, policies_removed = self._queue.get() PolicyUpdater._logger.info( "got policies_updated %s policies_removed %s", json.dumps(policies_updated), json.dumps(policies_removed)) @@ -66,17 +80,22 @@ class PolicyUpdater(Thread): self._queue.task_done() break - if self._on_catch_up(audit) or not audit: + if self._on_catch_up(queued_audit) or not queued_audit: continue updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (audit, policies_updated, policies_removed)) + (queued_audit, policies_updated, policies_removed)) message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} - DeployHandler.policy_update(audit, message) - audit.audit_done() + self._need_to_catch_up = DeployHandler.policy_update(queued_audit, message) + + queued_audit.audit_done() self._queue.task_done() + if self._need_to_catch_up: + self._pause_catch_up_timer() + self.catch_up() + PolicyUpdater._logger.info("exit policy-updater") def _keep_running(self): @@ -88,14 +107,68 @@ class PolicyUpdater(Thread): self._aud_shutdown.audit_done() return keep_running - def catch_up(self, audit): + def catch_up(self, audit=None): """need to bring the latest policies to DCAE-Controller""" PolicyUpdater._logger.info("catch_up requested") with self._lock: - self._aud_catch_up = audit + self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) self.enqueue() + def _run_catch_up_timer(self): + """create and start the catch_up timer""" + if not self._catch_up_interval: + return + + if self._catch_up_timer: + self._catch_up_timer.next() + self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) + return + + self._catch_up_timer = StepTimer( + "catch_up_timer", + self._catch_up_interval, + PolicyUpdater.catch_up, + self + ) + self._catch_up_timer.start() + self._logger.info("started catch_up_timer in %s", self._catch_up_interval) + + def _pause_catch_up_timer(self): + """stop catch_up_timer""" + if self._catch_up_timer: + self._logger.info("pause catch_up_timer") + self._catch_up_timer.pause() + + def _stop_catch_up_timer(self): + """stop catch_up_timer""" + if self._catch_up_timer: + self._logger.info("stopping catch_up_timer") + self._catch_up_timer.stop() + self._catch_up_timer.join() + self._catch_up_timer = None + self._logger.info("stopped catch_up_timer") + + def _need_to_send_catch_up(self, aud_catch_up, catch_up_message): + """try not to send the duplicate messages on auto catchup unless hitting the max count""" + if self._need_to_catch_up \ + or aud_catch_up.req_message != AUTO_CATCH_UP \ + or self._catch_up_skips >= self._catch_up_max_skips \ + or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message): + self._catch_up_skips = 0 + self._need_to_catch_up = False + self._catch_up_prev_message = copy.deepcopy(catch_up_message) + return True + + self._catch_up_skips += 1 + self._catch_up_prev_message = copy.deepcopy(catch_up_message) + log_line = "skip {0} sending the same catch_up: {1}".format( + self._catch_up_skips, self._catch_up_prev_message + ) + self._logger.info(log_line) + aud_catch_up.info(log_line) + return False + def _reset_queue(self): """clear up the queue""" with self._lock: @@ -103,32 +176,34 @@ class PolicyUpdater(Thread): self._queue.task_done() self._queue = Queue() - def _on_catch_up(self, audit): - """Bring the latest policies to DCAE-Controller""" - self._lock.acquire() - aud_catch_up = self._aud_catch_up - if self._aud_catch_up: - self._aud_catch_up = None - self._lock.release() + def _on_catch_up(self, queued_audit): + """bring all the latest policies to DCAE-Controller""" + with self._lock: + aud_catch_up = self._aud_catch_up + if self._aud_catch_up: + self._aud_catch_up = None if not aud_catch_up: return False PolicyUpdater._logger.info("catch_up") + self._pause_catch_up_timer() - result = PolicyRest.get_latest_policies(aud_catch_up) - result[CATCH_UP] = True + catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) + catch_up_message[CATCH_UP] = True if not aud_catch_up.is_success(): PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") - if not audit: + if not queued_audit: self._queue.task_done() - else: - DeployHandler.policy_update(aud_catch_up, result) + elif self._need_to_send_catch_up(aud_catch_up, catch_up_message): + DeployHandler.policy_update(aud_catch_up, catch_up_message) self._reset_queue() success, _, _ = aud_catch_up.audit_done() PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + self._run_catch_up_timer() + return success def shutdown(self, audit): @@ -137,5 +212,8 @@ class PolicyUpdater(Thread): with self._lock: self._aud_shutdown = audit self.enqueue() + + self._stop_catch_up_timer() + if self.is_alive(): self.join() diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py index 652f98b..69978b6 100644 --- a/policyhandler/policy_utils.py +++ b/policyhandler/policy_utils.py @@ -18,11 +18,12 @@ """policy-client communicates with policy-engine thru REST API""" -import logging import json +import logging import re -from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_BODY, POLICY_CONFIG +from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, + POLICY_VERSION) class PolicyUtils(object): """policy-client utils""" @@ -30,17 +31,6 @@ class PolicyUtils(object): _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') @staticmethod - def safe_json_parse(json_str): - """try parsing json without exception - returns the json_str back if fails""" - if not json_str: - return json_str - try: - return json.loads(json_str) - except (ValueError, TypeError) as err: - PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) - return json_str - - @staticmethod def extract_policy_id(policy_name): """ policy_name = policy_id + "." + <version> + "." + <extension> For instance, @@ -65,7 +55,7 @@ class PolicyUtils(object): return policy config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG) if config: - policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(config) + policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config) return policy @staticmethod @@ -131,3 +121,54 @@ class PolicyUtils(object): policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) return policies + +class Utils(object): + """general purpose utils""" + _logger = logging.getLogger("policy_handler.utils") + + @staticmethod + def safe_json_parse(json_str): + """try parsing json without exception - returns the json_str back if fails""" + if not json_str: + return json_str + try: + return json.loads(json_str) + except (ValueError, TypeError) as err: + Utils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) + return json_str + + @staticmethod + def are_the_same(body_1, body_2): + """check whether both objects are the same""" + if (body_1 and not body_2) or (not body_1 and body_2): + Utils._logger.debug("only one is empty %s != %s", body_1, body_2) + return False + + if body_1 is None and body_2 is None: + return True + + if isinstance(body_1, list) and isinstance(body_2, list): + if len(body_1) != len(body_2): + Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2)) + return False + + for val_1, val_2 in zip(body_1, body_2): + if not Utils.are_the_same(val_1, val_2): + return False + return True + + if isinstance(body_1, dict) and isinstance(body_2, dict): + if body_1.viewkeys() ^ body_2.viewkeys(): + Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2)) + return False + + for key, val_1 in body_1.iteritems(): + if not Utils.are_the_same(val_1, body_2[key]): + return False + return True + + # ... here when primitive values or mismatched types ... + the_same_values = (body_1 == body_2) + if not the_same_values: + Utils._logger.debug("values %s != %s", body_1, body_2) + return the_same_values diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py new file mode 100644 index 0000000..6e2b3f6 --- /dev/null +++ b/policyhandler/step_timer.py @@ -0,0 +1,63 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""periodically callback""" + +from threading import Event, Thread + + +class StepTimer(Thread): + """call on_time after interval number of seconds, then wait to continue""" + def __init__(self, name, interval, on_time, *args, **kwargs): + Thread.__init__(self, name=name) + self._interval = interval + self._on_time = on_time + self._args = args + self._kwargs = kwargs + + self._timeout = Event() + self._paused = Event() + self._continue = Event() + self._finished = Event() + + def next(self): + """continue with the next timeout""" + self._paused.clear() + self._continue.set() + + def pause(self): + """pause the timer""" + self._paused.set() + + def stop(self): + """stop the timer if it hasn't finished yet""" + self._finished.set() + self._timeout.set() + self._continue.set() + + def run(self): + """loop until stopped=finished""" + while True: + self._timeout.wait(self._interval) + if self._finished.is_set(): + break + self._timeout.clear() + self._continue.clear() + if not self._paused.is_set(): + self._on_time(*self._args, **self._kwargs) + self._continue.wait() |