From a2b262994a62286e1c98fd9939115c6e64ed27ee Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Tue, 13 Mar 2018 17:50:51 -0400 Subject: 2.3.0 policy-handler - periodically catch_up - periodically catchup - interval is configurable = max_skips defines the number of times the catch_up message that is identical to previous one can be skipped - do not catchup more often than the interval even between the manual catchup and auto catchup - do not send the same catchup message twice in a row to the deployment-handler but not exceed a hard limit on catchup max_skips - catchup if the deployment-handler instance is changed Change-Id: I9a3fcc941e8a9e553abb3952dd882c37e0f5fdfe Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-389 --- etc_upload/config.json | 4 ++ policyhandler/deploy_handler.py | 31 +++++++++-- policyhandler/policy_consts.py | 1 + policyhandler/policy_updater.py | 118 +++++++++++++++++++++++++++++++++------- policyhandler/policy_utils.py | 69 ++++++++++++++++++----- policyhandler/step_timer.py | 63 +++++++++++++++++++++ pom.xml | 2 +- setup.py | 2 +- tests/test_policyhandler.py | 39 ++----------- version.properties | 2 +- 10 files changed, 255 insertions(+), 76 deletions(-) create mode 100644 policyhandler/step_timer.py diff --git a/etc_upload/config.json b/etc_upload/config.json index e143773..920ef7c 100644 --- a/etc_upload/config.json +++ b/etc_upload/config.json @@ -6,6 +6,10 @@ "scope_prefixes" : ["DCAE_alex.Config_", "DCAE.Config_"], "policy_retry_count" : 5, "policy_retry_sleep" : 5, + "catch_up" : { + "interval" : 1200, + "max_skips" : 5 + }, "policy_engine" : { "url" : "https://peawiv9nspd01.pedc.sbc.com:8081", "path_pdp" :"/pdp/", diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 0950dfd..493b7d5 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -41,6 +41,7 @@ class DeployHandler(object): _url_path = None _target_entity = None _custom_kwargs = None + _server_instance_uuid = None @staticmethod def _lazy_init(audit): @@ -72,7 +73,11 @@ class DeployHandler(object): @staticmethod def policy_update(audit, message): - """post policy_updated message to deploy-handler""" + """ + post policy_updated message to deploy-handler + + returns condition whether it needs to catch_up + """ if not message: return @@ -116,8 +121,24 @@ class DeployHandler(object): sub_aud.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) - sub_aud.metrics("response {0} from {1}: text={2}{3}" \ - .format(res.status_code, log_action, res.text, log_data)) + log_line = "response {0} from {1}: text={2}{3}" \ + .format(res.status_code, log_action, res.text, log_data) + sub_aud.metrics(log_line) + DeployHandler._logger.info(log_line) + + if res.status_code != requests.codes.ok: + return + + result = res.json() or {} + prev_server_instance_uuid = DeployHandler._server_instance_uuid + DeployHandler._server_instance_uuid = result.get("server_instance_uuid") + + need_to_catch_up = (prev_server_instance_uuid + and prev_server_instance_uuid != DeployHandler._server_instance_uuid) + if need_to_catch_up: + log_line = "need_to_catch_up: {1} != {0}" \ + .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid) + sub_aud.info(log_line) + DeployHandler._logger.info(log_line) - if res.status_code == requests.codes.ok: - return res.json() + return need_to_catch_up diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 013ed2a..90ede47 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -25,6 +25,7 @@ POLICY_BODY = 'policy_body' POLICY_CONFIG = 'config' CATCH_UP = "catch_up" +AUTO_CATCH_UP = "auto catch_up" LATEST_POLICIES = "latest_policies" REMOVED_POLICIES = "removed_policies" ERRORED_POLICIES = "errored_policies" diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 01be550..e12af88 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -18,15 +18,21 @@ """policy-updater thread""" +import copy import json import logging from Queue import Queue from threading import Lock, Thread +from .config import Config from .deploy_handler import DeployHandler from .onap.audit import Audit -from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES +from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES, + REMOVED_POLICIES) from .policy_rest import PolicyRest +from .policy_utils import Utils +from .step_timer import StepTimer + class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -37,9 +43,17 @@ class PolicyUpdater(Thread): Thread.__init__(self, name="policy_updater") self.daemon = True + self._catch_up_timer = None self._aud_shutdown = None self._aud_catch_up = None + catch_up_config = Config.config.get("catch_up", {}) + self._catch_up_interval = catch_up_config.get("interval") or 15*60 + self._catch_up_max_skips = catch_up_config.get("max_skips") or 3 + self._catch_up_skips = 0 + self._catch_up_prev_message = None + self._need_to_catch_up = False + self._lock = Lock() self._queue = Queue() @@ -57,7 +71,7 @@ class PolicyUpdater(Thread): """wait and run the policy-update in thread""" while True: PolicyUpdater._logger.info("waiting for policy-updates...") - audit, policies_updated, policies_removed = self._queue.get() + queued_audit, policies_updated, policies_removed = self._queue.get() PolicyUpdater._logger.info( "got policies_updated %s policies_removed %s", json.dumps(policies_updated), json.dumps(policies_removed)) @@ -66,17 +80,22 @@ class PolicyUpdater(Thread): self._queue.task_done() break - if self._on_catch_up(audit) or not audit: + if self._on_catch_up(queued_audit) or not queued_audit: continue updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (audit, policies_updated, policies_removed)) + (queued_audit, policies_updated, policies_removed)) message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} - DeployHandler.policy_update(audit, message) - audit.audit_done() + self._need_to_catch_up = DeployHandler.policy_update(queued_audit, message) + + queued_audit.audit_done() self._queue.task_done() + if self._need_to_catch_up: + self._pause_catch_up_timer() + self.catch_up() + PolicyUpdater._logger.info("exit policy-updater") def _keep_running(self): @@ -88,14 +107,68 @@ class PolicyUpdater(Thread): self._aud_shutdown.audit_done() return keep_running - def catch_up(self, audit): + def catch_up(self, audit=None): """need to bring the latest policies to DCAE-Controller""" PolicyUpdater._logger.info("catch_up requested") with self._lock: - self._aud_catch_up = audit + self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) self.enqueue() + def _run_catch_up_timer(self): + """create and start the catch_up timer""" + if not self._catch_up_interval: + return + + if self._catch_up_timer: + self._catch_up_timer.next() + self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) + return + + self._catch_up_timer = StepTimer( + "catch_up_timer", + self._catch_up_interval, + PolicyUpdater.catch_up, + self + ) + self._catch_up_timer.start() + self._logger.info("started catch_up_timer in %s", self._catch_up_interval) + + def _pause_catch_up_timer(self): + """stop catch_up_timer""" + if self._catch_up_timer: + self._logger.info("pause catch_up_timer") + self._catch_up_timer.pause() + + def _stop_catch_up_timer(self): + """stop catch_up_timer""" + if self._catch_up_timer: + self._logger.info("stopping catch_up_timer") + self._catch_up_timer.stop() + self._catch_up_timer.join() + self._catch_up_timer = None + self._logger.info("stopped catch_up_timer") + + def _need_to_send_catch_up(self, aud_catch_up, catch_up_message): + """try not to send the duplicate messages on auto catchup unless hitting the max count""" + if self._need_to_catch_up \ + or aud_catch_up.req_message != AUTO_CATCH_UP \ + or self._catch_up_skips >= self._catch_up_max_skips \ + or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message): + self._catch_up_skips = 0 + self._need_to_catch_up = False + self._catch_up_prev_message = copy.deepcopy(catch_up_message) + return True + + self._catch_up_skips += 1 + self._catch_up_prev_message = copy.deepcopy(catch_up_message) + log_line = "skip {0} sending the same catch_up: {1}".format( + self._catch_up_skips, self._catch_up_prev_message + ) + self._logger.info(log_line) + aud_catch_up.info(log_line) + return False + def _reset_queue(self): """clear up the queue""" with self._lock: @@ -103,32 +176,34 @@ class PolicyUpdater(Thread): self._queue.task_done() self._queue = Queue() - def _on_catch_up(self, audit): - """Bring the latest policies to DCAE-Controller""" - self._lock.acquire() - aud_catch_up = self._aud_catch_up - if self._aud_catch_up: - self._aud_catch_up = None - self._lock.release() + def _on_catch_up(self, queued_audit): + """bring all the latest policies to DCAE-Controller""" + with self._lock: + aud_catch_up = self._aud_catch_up + if self._aud_catch_up: + self._aud_catch_up = None if not aud_catch_up: return False PolicyUpdater._logger.info("catch_up") + self._pause_catch_up_timer() - result = PolicyRest.get_latest_policies(aud_catch_up) - result[CATCH_UP] = True + catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) + catch_up_message[CATCH_UP] = True if not aud_catch_up.is_success(): PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") - if not audit: + if not queued_audit: self._queue.task_done() - else: - DeployHandler.policy_update(aud_catch_up, result) + elif self._need_to_send_catch_up(aud_catch_up, catch_up_message): + DeployHandler.policy_update(aud_catch_up, catch_up_message) self._reset_queue() success, _, _ = aud_catch_up.audit_done() PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + self._run_catch_up_timer() + return success def shutdown(self, audit): @@ -137,5 +212,8 @@ class PolicyUpdater(Thread): with self._lock: self._aud_shutdown = audit self.enqueue() + + self._stop_catch_up_timer() + if self.is_alive(): self.join() diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py index 652f98b..69978b6 100644 --- a/policyhandler/policy_utils.py +++ b/policyhandler/policy_utils.py @@ -18,28 +18,18 @@ """policy-client communicates with policy-engine thru REST API""" -import logging import json +import logging import re -from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_BODY, POLICY_CONFIG +from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, + POLICY_VERSION) class PolicyUtils(object): """policy-client utils""" _logger = logging.getLogger("policy_handler.policy_utils") _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') - @staticmethod - def safe_json_parse(json_str): - """try parsing json without exception - returns the json_str back if fails""" - if not json_str: - return json_str - try: - return json.loads(json_str) - except (ValueError, TypeError) as err: - PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) - return json_str - @staticmethod def extract_policy_id(policy_name): """ policy_name = policy_id + "." + + "." + @@ -65,7 +55,7 @@ class PolicyUtils(object): return policy config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG) if config: - policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(config) + policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config) return policy @staticmethod @@ -131,3 +121,54 @@ class PolicyUtils(object): policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) return policies + +class Utils(object): + """general purpose utils""" + _logger = logging.getLogger("policy_handler.utils") + + @staticmethod + def safe_json_parse(json_str): + """try parsing json without exception - returns the json_str back if fails""" + if not json_str: + return json_str + try: + return json.loads(json_str) + except (ValueError, TypeError) as err: + Utils._logger.warn("unexpected json %s: %s", str(json_str), str(err)) + return json_str + + @staticmethod + def are_the_same(body_1, body_2): + """check whether both objects are the same""" + if (body_1 and not body_2) or (not body_1 and body_2): + Utils._logger.debug("only one is empty %s != %s", body_1, body_2) + return False + + if body_1 is None and body_2 is None: + return True + + if isinstance(body_1, list) and isinstance(body_2, list): + if len(body_1) != len(body_2): + Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2)) + return False + + for val_1, val_2 in zip(body_1, body_2): + if not Utils.are_the_same(val_1, val_2): + return False + return True + + if isinstance(body_1, dict) and isinstance(body_2, dict): + if body_1.viewkeys() ^ body_2.viewkeys(): + Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2)) + return False + + for key, val_1 in body_1.iteritems(): + if not Utils.are_the_same(val_1, body_2[key]): + return False + return True + + # ... here when primitive values or mismatched types ... + the_same_values = (body_1 == body_2) + if not the_same_values: + Utils._logger.debug("values %s != %s", body_1, body_2) + return the_same_values diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py new file mode 100644 index 0000000..6e2b3f6 --- /dev/null +++ b/policyhandler/step_timer.py @@ -0,0 +1,63 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""periodically callback""" + +from threading import Event, Thread + + +class StepTimer(Thread): + """call on_time after interval number of seconds, then wait to continue""" + def __init__(self, name, interval, on_time, *args, **kwargs): + Thread.__init__(self, name=name) + self._interval = interval + self._on_time = on_time + self._args = args + self._kwargs = kwargs + + self._timeout = Event() + self._paused = Event() + self._continue = Event() + self._finished = Event() + + def next(self): + """continue with the next timeout""" + self._paused.clear() + self._continue.set() + + def pause(self): + """pause the timer""" + self._paused.set() + + def stop(self): + """stop the timer if it hasn't finished yet""" + self._finished.set() + self._timeout.set() + self._continue.set() + + def run(self): + """loop until stopped=finished""" + while True: + self._timeout.wait(self._interval) + if self._finished.is_set(): + break + self._timeout.clear() + self._continue.clear() + if not self._paused.is_set(): + self._on_time(*self._args, **self._kwargs) + self._continue.wait() diff --git a/pom.xml b/pom.xml index 2ed4312..97b9df5 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. org.onap.dcaegen2.platform policy-handler dcaegen2-platform-policy-handler - 2.2.0-SNAPSHOT + 2.3.0-SNAPSHOT http://maven.apache.org UTF-8 diff --git a/setup.py b/setup.py index fa2bc73..3165504 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ from setuptools import setup setup( name='policyhandler', description='DCAE-Controller policy-handler to communicate with policy-engine', - version="2.2.0", + version="2.3.0", author='Alex Shatov', packages=['policyhandler'], zip_safe=False, diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index 375c358..b52e718 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -44,7 +44,7 @@ from policyhandler.policy_handler import LogWriter from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER, REMOVED_POLICIES, PolicyReceiver) from policyhandler.policy_rest import PolicyRest -from policyhandler.policy_utils import PolicyUtils +from policyhandler.policy_utils import PolicyUtils, Utils from policyhandler.web_server import _PolicyWeb POLICY_HANDLER_VERSION = "2.2.0" @@ -150,31 +150,6 @@ class MonkeyPolicyBody(object): "property": None } - @staticmethod - def is_the_same_dict(policy_body_1, policy_body_2): - """check whether both policy_body objects are the same""" - if not isinstance(policy_body_1, dict) or not isinstance(policy_body_2, dict): - return False - for key in policy_body_1.keys(): - if key not in policy_body_2: - return False - - val_1 = policy_body_1[key] - val_2 = policy_body_2[key] - if isinstance(val_1, list) and isinstance(val_2, list): - if sorted(val_1) != sorted(val_2): - return False - continue - - if isinstance(val_1, dict) \ - and not MonkeyPolicyBody.is_the_same_dict(val_1, val_2): - return False - if (val_1 is None and val_2 is not None) \ - or (val_1 is not None and val_2 is None) \ - or (val_1 != val_2): - return False - return True - class MonkeyPolicyEngine(object): """pretend this is the policy-engine""" _scope_prefix = Config.config["scope_prefixes"][0] @@ -358,8 +333,7 @@ def test_get_policy_latest(fix_pdp_post): Settings.logger.info("expected_policy: %s", json.dumps(expected_policy)) Settings.logger.info("policy_latest: %s", json.dumps(policy_latest)) - assert MonkeyPolicyBody.is_the_same_dict(policy_latest, expected_policy) - assert MonkeyPolicyBody.is_the_same_dict(expected_policy, policy_latest) + assert Utils.are_the_same(policy_latest, expected_policy) def test_healthcheck(): """test /healthcheck""" @@ -422,8 +396,7 @@ class WebServerTest(CPWebCase): Settings.logger.info("policy_latest: %s", self.body) Settings.logger.info("expected_policy: %s", json.dumps(expected_policy)) - assert MonkeyPolicyBody.is_the_same_dict(policy_latest, expected_policy) - assert MonkeyPolicyBody.is_the_same_dict(expected_policy, policy_latest) + assert Utils.are_the_same(policy_latest, expected_policy) def test_web_all_policies_latest(self): """test GET /policies_latest""" @@ -441,8 +414,7 @@ class WebServerTest(CPWebCase): Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) - assert MonkeyPolicyBody.is_the_same_dict(policies_latest, expected_policies) - assert MonkeyPolicyBody.is_the_same_dict(expected_policies, policies_latest) + assert Utils.are_the_same(policies_latest, expected_policies) def test_web_policies_latest(self): """test POST /policies_latest with policyName""" @@ -466,8 +438,7 @@ class WebServerTest(CPWebCase): Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) - assert MonkeyPolicyBody.is_the_same_dict(policies_latest, expected_policies) - assert MonkeyPolicyBody.is_the_same_dict(expected_policies, policies_latest) + assert Utils.are_the_same(policies_latest, expected_policies) @pytest.mark.usefixtures( "fix_deploy_handler", diff --git a/version.properties b/version.properties index 5791c10..744a2a4 100644 --- a/version.properties +++ b/version.properties @@ -1,5 +1,5 @@ major=2 -minor=2 +minor=3 patch=0 base_version=${major}.${minor}.${patch} release_version=${base_version} -- cgit 1.2.3-korg