aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-03-16 12:30:08 -0400
committerAlex Shatov <alexs@att.com>2018-03-16 12:30:08 -0400
commit7e220c8dd68a969885287689c15d8dc711a06f8a (patch)
tree45afeb4e756865f3fac0a6c0c73b90b821d0ff75
parenta2b262994a62286e1c98fd9939115c6e64ed27ee (diff)
2.3.1 policy-handler - catch_up more often
- enforce(do not skip) next catchup if failed to send to deployment-handler - more audit info and logging - cleaner queue handling in the policy-updater thread = queue reset and task_done on the to level in the run Change-Id: If3080d08f0f6560e3f0bd509fde94a7f8191b228 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-389
-rw-r--r--policyhandler/deploy_handler.py8
-rw-r--r--policyhandler/policy_rest.py2
-rw-r--r--policyhandler/policy_updater.py69
-rw-r--r--pom.xml2
-rw-r--r--setup.py2
-rw-r--r--version.properties2
6 files changed, 57 insertions, 28 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 493b7d5..f1a1294 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -133,12 +133,12 @@ class DeployHandler(object):
prev_server_instance_uuid = DeployHandler._server_instance_uuid
DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
- need_to_catch_up = (prev_server_instance_uuid
+ deployment_handler_changed = (prev_server_instance_uuid
and prev_server_instance_uuid != DeployHandler._server_instance_uuid)
- if need_to_catch_up:
- log_line = "need_to_catch_up: {1} != {0}" \
+ if deployment_handler_changed:
+ log_line = "deployment_handler_changed: {1} != {0}" \
.format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)
sub_aud.info(log_line)
DeployHandler._logger.info(log_line)
- return need_to_catch_up
+ return deployment_handler_changed
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index b11a79a..4eee154 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -444,7 +444,7 @@ class PolicyRest(object):
result[ERRORED_POLICIES] = dict(
pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems())
- result[ERRORED_SCOPES] = [esp for (_, _, esp) in latest_policies if esp]
+ result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp])
PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
str_policy_filters, json.dumps(result))
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index e12af88..2c2f729 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -52,7 +52,6 @@ class PolicyUpdater(Thread):
self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
self._catch_up_skips = 0
self._catch_up_prev_message = None
- self._need_to_catch_up = False
self._lock = Lock()
self._queue = Queue()
@@ -63,7 +62,8 @@ class PolicyUpdater(Thread):
policies_removed = policies_removed or []
PolicyUpdater._logger.info(
- "policies_updated %s policies_removed %s",
+ "enqueue request_id %s policies_updated %s policies_removed %s",
+ ((audit and audit.request_id) or "none"),
json.dumps(policies_updated), json.dumps(policies_removed))
self._queue.put((audit, policies_updated, policies_removed))
@@ -73,28 +73,37 @@ class PolicyUpdater(Thread):
PolicyUpdater._logger.info("waiting for policy-updates...")
queued_audit, policies_updated, policies_removed = self._queue.get()
PolicyUpdater._logger.info(
- "got policies_updated %s policies_removed %s",
+ "got request_id %s policies_updated %s policies_removed %s",
+ ((queued_audit and queued_audit.request_id) or "none"),
json.dumps(policies_updated), json.dumps(policies_removed))
if not self._keep_running():
self._queue.task_done()
break
- if self._on_catch_up(queued_audit) or not queued_audit:
+ if self._on_catch_up():
+ self._reset_queue()
+ continue
+ elif not queued_audit:
+ self._queue.task_done()
continue
updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
(queued_audit, policies_updated, policies_removed))
message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
- self._need_to_catch_up = DeployHandler.policy_update(queued_audit, message)
- queued_audit.audit_done()
+ deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
+
self._queue.task_done()
+ queued_audit.audit_done()
- if self._need_to_catch_up:
+ if deployment_handler_changed:
+ self._catch_up_prev_message = None
self._pause_catch_up_timer()
self.catch_up()
+ elif not queued_audit.is_success():
+ self._catch_up_prev_message = None
PolicyUpdater._logger.info("exit policy-updater")
@@ -109,9 +118,10 @@ class PolicyUpdater(Thread):
def catch_up(self, audit=None):
"""need to bring the latest policies to DCAE-Controller"""
- PolicyUpdater._logger.info("catch_up requested")
with self._lock:
self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
+ PolicyUpdater._logger.info("catch_up %s request_id %s",
+ self._aud_catch_up.req_message, self._aud_catch_up.request_id)
self.enqueue()
@@ -151,19 +161,24 @@ class PolicyUpdater(Thread):
def _need_to_send_catch_up(self, aud_catch_up, catch_up_message):
"""try not to send the duplicate messages on auto catchup unless hitting the max count"""
- if self._need_to_catch_up \
- or aud_catch_up.req_message != AUTO_CATCH_UP \
+ if aud_catch_up.req_message != AUTO_CATCH_UP \
or self._catch_up_skips >= self._catch_up_max_skips \
or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message):
self._catch_up_skips = 0
- self._need_to_catch_up = False
self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ log_line = "going to send the catch_up {0}: {1}".format(
+ aud_catch_up.req_message,
+ json.dumps(self._catch_up_prev_message)
+ )
+ self._logger.info(log_line)
+ aud_catch_up.info(log_line)
return True
self._catch_up_skips += 1
self._catch_up_prev_message = copy.deepcopy(catch_up_message)
- log_line = "skip {0} sending the same catch_up: {1}".format(
- self._catch_up_skips, self._catch_up_prev_message
+ log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format(
+ self._catch_up_skips, self._catch_up_max_skips,
+ aud_catch_up.req_message, json.dumps(self._catch_up_prev_message)
)
self._logger.info(log_line)
aud_catch_up.info(log_line)
@@ -176,7 +191,7 @@ class PolicyUpdater(Thread):
self._queue.task_done()
self._queue = Queue()
- def _on_catch_up(self, queued_audit):
+ def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
with self._lock:
aud_catch_up = self._aud_catch_up
@@ -186,22 +201,36 @@ class PolicyUpdater(Thread):
if not aud_catch_up:
return False
- PolicyUpdater._logger.info("catch_up")
+ log_line = "catch_up {0} request_id {1}".format(
+ aud_catch_up.req_message, aud_catch_up.request_id
+ )
+
+ PolicyUpdater._logger.info(log_line)
self._pause_catch_up_timer()
catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
catch_up_message[CATCH_UP] = True
+ catch_up_result = ""
if not aud_catch_up.is_success():
- PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors")
- if not queued_audit:
- self._queue.task_done()
+ catch_up_result = "- not sending catch-up to deployment-handler due to errors"
+ PolicyUpdater._logger.warn(catch_up_result)
elif self._need_to_send_catch_up(aud_catch_up, catch_up_message):
DeployHandler.policy_update(aud_catch_up, catch_up_message)
- self._reset_queue()
- success, _, _ = aud_catch_up.audit_done()
+ if aud_catch_up.is_success():
+ catch_up_result = "- sent catch-up to deployment-handler"
+ else:
+ catch_up_result = "- failed to send catch-up to deployment-handler"
+ PolicyUpdater._logger.warn(catch_up_result)
+ else:
+ catch_up_result = "- skipped sending the same policies"
+ success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
+ PolicyUpdater._logger.info(log_line + " " + catch_up_result)
PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+ if not success:
+ self._catch_up_prev_message = None
+
self._run_catch_up_timer()
return success
diff --git a/pom.xml b/pom.xml
index 97b9df5..d49962f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
<groupId>org.onap.dcaegen2.platform</groupId>
<artifactId>policy-handler</artifactId>
<name>dcaegen2-platform-policy-handler</name>
- <version>2.3.0-SNAPSHOT</version>
+ <version>2.3.1-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/setup.py b/setup.py
index 3165504..0481a1e 100644
--- a/setup.py
+++ b/setup.py
@@ -23,7 +23,7 @@ from setuptools import setup
setup(
name='policyhandler',
description='DCAE-Controller policy-handler to communicate with policy-engine',
- version="2.3.0",
+ version="2.3.1",
author='Alex Shatov',
packages=['policyhandler'],
zip_safe=False,
diff --git a/version.properties b/version.properties
index 744a2a4..2416ac8 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
major=2
minor=3
-patch=0
+patch=1
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT