From 7e220c8dd68a969885287689c15d8dc711a06f8a Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Fri, 16 Mar 2018 12:30:08 -0400 Subject: 2.3.1 policy-handler - catch_up more often - enforce(do not skip) next catchup if failed to send to deployment-handler - more audit info and logging - cleaner queue handling in the policy-updater thread = queue reset and task_done on the to level in the run Change-Id: If3080d08f0f6560e3f0bd509fde94a7f8191b228 Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-389 --- policyhandler/deploy_handler.py | 8 ++--- policyhandler/policy_rest.py | 2 +- policyhandler/policy_updater.py | 69 +++++++++++++++++++++++++++++------------ pom.xml | 2 +- setup.py | 2 +- version.properties | 2 +- 6 files changed, 57 insertions(+), 28 deletions(-) diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 493b7d5..f1a1294 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -133,12 +133,12 @@ class DeployHandler(object): prev_server_instance_uuid = DeployHandler._server_instance_uuid DeployHandler._server_instance_uuid = result.get("server_instance_uuid") - need_to_catch_up = (prev_server_instance_uuid + deployment_handler_changed = (prev_server_instance_uuid and prev_server_instance_uuid != DeployHandler._server_instance_uuid) - if need_to_catch_up: - log_line = "need_to_catch_up: {1} != {0}" \ + if deployment_handler_changed: + log_line = "deployment_handler_changed: {1} != {0}" \ .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid) sub_aud.info(log_line) DeployHandler._logger.info(log_line) - return need_to_catch_up + return deployment_handler_changed diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index b11a79a..4eee154 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -444,7 +444,7 @@ class PolicyRest(object): result[ERRORED_POLICIES] = dict( pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems()) - result[ERRORED_SCOPES] = [esp for (_, _, esp) in latest_policies if esp] + result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp]) PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", str_policy_filters, json.dumps(result)) diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index e12af88..2c2f729 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -52,7 +52,6 @@ class PolicyUpdater(Thread): self._catch_up_max_skips = catch_up_config.get("max_skips") or 3 self._catch_up_skips = 0 self._catch_up_prev_message = None - self._need_to_catch_up = False self._lock = Lock() self._queue = Queue() @@ -63,7 +62,8 @@ class PolicyUpdater(Thread): policies_removed = policies_removed or [] PolicyUpdater._logger.info( - "policies_updated %s policies_removed %s", + "enqueue request_id %s policies_updated %s policies_removed %s", + ((audit and audit.request_id) or "none"), json.dumps(policies_updated), json.dumps(policies_removed)) self._queue.put((audit, policies_updated, policies_removed)) @@ -73,28 +73,37 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.info("waiting for policy-updates...") queued_audit, policies_updated, policies_removed = self._queue.get() PolicyUpdater._logger.info( - "got policies_updated %s policies_removed %s", + "got request_id %s policies_updated %s policies_removed %s", + ((queued_audit and queued_audit.request_id) or "none"), json.dumps(policies_updated), json.dumps(policies_removed)) if not self._keep_running(): self._queue.task_done() break - if self._on_catch_up(queued_audit) or not queued_audit: + if self._on_catch_up(): + self._reset_queue() + continue + elif not queued_audit: + self._queue.task_done() continue updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( (queued_audit, policies_updated, policies_removed)) message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} - self._need_to_catch_up = DeployHandler.policy_update(queued_audit, message) - queued_audit.audit_done() + deployment_handler_changed = DeployHandler.policy_update(queued_audit, message) + self._queue.task_done() + queued_audit.audit_done() - if self._need_to_catch_up: + if deployment_handler_changed: + self._catch_up_prev_message = None self._pause_catch_up_timer() self.catch_up() + elif not queued_audit.is_success(): + self._catch_up_prev_message = None PolicyUpdater._logger.info("exit policy-updater") @@ -109,9 +118,10 @@ class PolicyUpdater(Thread): def catch_up(self, audit=None): """need to bring the latest policies to DCAE-Controller""" - PolicyUpdater._logger.info("catch_up requested") with self._lock: self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) + PolicyUpdater._logger.info("catch_up %s request_id %s", + self._aud_catch_up.req_message, self._aud_catch_up.request_id) self.enqueue() @@ -151,19 +161,24 @@ class PolicyUpdater(Thread): def _need_to_send_catch_up(self, aud_catch_up, catch_up_message): """try not to send the duplicate messages on auto catchup unless hitting the max count""" - if self._need_to_catch_up \ - or aud_catch_up.req_message != AUTO_CATCH_UP \ + if aud_catch_up.req_message != AUTO_CATCH_UP \ or self._catch_up_skips >= self._catch_up_max_skips \ or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message): self._catch_up_skips = 0 - self._need_to_catch_up = False self._catch_up_prev_message = copy.deepcopy(catch_up_message) + log_line = "going to send the catch_up {0}: {1}".format( + aud_catch_up.req_message, + json.dumps(self._catch_up_prev_message) + ) + self._logger.info(log_line) + aud_catch_up.info(log_line) return True self._catch_up_skips += 1 self._catch_up_prev_message = copy.deepcopy(catch_up_message) - log_line = "skip {0} sending the same catch_up: {1}".format( - self._catch_up_skips, self._catch_up_prev_message + log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format( + self._catch_up_skips, self._catch_up_max_skips, + aud_catch_up.req_message, json.dumps(self._catch_up_prev_message) ) self._logger.info(log_line) aud_catch_up.info(log_line) @@ -176,7 +191,7 @@ class PolicyUpdater(Thread): self._queue.task_done() self._queue = Queue() - def _on_catch_up(self, queued_audit): + def _on_catch_up(self): """bring all the latest policies to DCAE-Controller""" with self._lock: aud_catch_up = self._aud_catch_up @@ -186,22 +201,36 @@ class PolicyUpdater(Thread): if not aud_catch_up: return False - PolicyUpdater._logger.info("catch_up") + log_line = "catch_up {0} request_id {1}".format( + aud_catch_up.req_message, aud_catch_up.request_id + ) + + PolicyUpdater._logger.info(log_line) self._pause_catch_up_timer() catch_up_message = PolicyRest.get_latest_policies(aud_catch_up) catch_up_message[CATCH_UP] = True + catch_up_result = "" if not aud_catch_up.is_success(): - PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") - if not queued_audit: - self._queue.task_done() + catch_up_result = "- not sending catch-up to deployment-handler due to errors" + PolicyUpdater._logger.warn(catch_up_result) elif self._need_to_send_catch_up(aud_catch_up, catch_up_message): DeployHandler.policy_update(aud_catch_up, catch_up_message) - self._reset_queue() - success, _, _ = aud_catch_up.audit_done() + if aud_catch_up.is_success(): + catch_up_result = "- sent catch-up to deployment-handler" + else: + catch_up_result = "- failed to send catch-up to deployment-handler" + PolicyUpdater._logger.warn(catch_up_result) + else: + catch_up_result = "- skipped sending the same policies" + success, _, _ = aud_catch_up.audit_done(result=catch_up_result) + PolicyUpdater._logger.info(log_line + " " + catch_up_result) PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) + if not success: + self._catch_up_prev_message = None + self._run_catch_up_timer() return success diff --git a/pom.xml b/pom.xml index 97b9df5..d49962f 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. org.onap.dcaegen2.platform policy-handler dcaegen2-platform-policy-handler - 2.3.0-SNAPSHOT + 2.3.1-SNAPSHOT http://maven.apache.org UTF-8 diff --git a/setup.py b/setup.py index 3165504..0481a1e 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ from setuptools import setup setup( name='policyhandler', description='DCAE-Controller policy-handler to communicate with policy-engine', - version="2.3.0", + version="2.3.1", author='Alex Shatov', packages=['policyhandler'], zip_safe=False, diff --git a/version.properties b/version.properties index 744a2a4..2416ac8 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=2 minor=3 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT -- cgit 1.2.3-korg