summaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_updater.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r--policyhandler/policy_updater.py256
1 files changed, 153 insertions, 103 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 5ba7c29..3ae8199 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -18,24 +18,101 @@
"""policy-updater thread"""
-import copy
import json
import logging
-from queue import Queue
-from threading import Lock, Thread
+from threading import Event, Lock, Thread
from .config import Config
-from .deploy_handler import DeployHandler
+from .deploy_handler import DeployHandler, PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
-from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
- REMOVED_POLICIES)
+from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID,
+ POLICY_NAME, POLICY_NAMES, POLICY_VERSION)
+from .policy_matcher import PolicyMatcher
from .policy_rest import PolicyRest
-from .policy_utils import Utils
+from .policy_utils import PolicyUtils
from .step_timer import StepTimer
+class _PolicyUpdate(object):
+ """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)"""
+ _logger = logging.getLogger("policy_handler.policy_update")
+
+ def __init__(self):
+ """init and reset"""
+ self._audit = None
+ self._policies_updated = {}
+ self._policies_removed = {}
+
+ def reset(self):
+ """resets the state"""
+ self.__init__()
+
+ def pop_policy_update(self):
+ """
+ Returns the consolidated (audit, policies_updated, policies_removed)
+ and resets the state
+ """
+ if not self._audit:
+ return None, None, None
+
+ audit = self._audit
+ policies_updated = self._policies_updated
+ policies_removed = self._policies_removed
+
+ self.reset()
+
+ return audit, policies_updated, policies_removed
+
+
+ def push_policy_update(self, policies_updated, policies_removed):
+ """consolidate the new policies_updated, policies_removed to existing ones"""
+ for policy_body in policies_updated:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+
+ self._policies_updated[policy_id] = policy
+
+ rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
+ if rm_policy_names and policy_name in rm_policy_names:
+ del rm_policy_names[policy_name]
+
+ for policy_body in policies_removed:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+
+ if policy_id in self._policies_removed:
+ policy = self._policies_removed[policy_id]
+
+ if POLICY_NAMES not in policy:
+ policy[POLICY_NAMES] = {}
+ policy[POLICY_NAMES][policy_name] = True
+ self._policies_removed[policy_id] = policy
+
+ req_message = ("policy-update notification - updated[{0}], removed[{1}]"
+ .format(len(self._policies_updated),
+ len(self._policies_removed)))
+
+ if not self._audit:
+ self._audit = Audit(job_name="policy_update",
+ req_message=req_message,
+ retry_get_config=True)
+ else:
+ self._audit.req_message = req_message
+
+ self._logger.info(
+ "pending request_id %s for %s policies_updated %s policies_removed %s",
+ self._audit.request_id, req_message,
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
+
class PolicyUpdater(Thread):
- """queue and handle the policy-updates in a separate thread"""
+ """sequentially handle the policy-updates and catch-ups in its own policy_updater thread"""
_logger = logging.getLogger("policy_handler.policy_updater")
def __init__(self):
@@ -43,33 +120,22 @@ class PolicyUpdater(Thread):
Thread.__init__(self, name="policy_updater")
self.daemon = True
+ self._lock = Lock()
+ self._run = Event()
+
self._catch_up_timer = None
self._aud_shutdown = None
self._aud_catch_up = None
+ self._policy_update = _PolicyUpdate()
catch_up_config = Config.settings.get(CATCH_UP, {})
self._catch_up_interval = catch_up_config.get("interval") or 15*60
- self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
- self._catch_up_skips = 0
- self._catch_up_prev_message = None
-
- self._lock = Lock()
- self._queue = Queue()
-
- def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
+ def policy_update(self, policies_updated, policies_removed):
"""enqueue the policy-updates"""
- policies_updated = policies_updated or []
- policies_removed = policies_removed or []
-
- PolicyUpdater._logger.info(
- "enqueue request_id %s policies_updated %s policies_removed %s",
- ((audit and audit.request_id) or "none"),
- json.dumps(policies_updated), json.dumps(policies_removed))
-
with self._lock:
- self._queue.put((audit, policies_updated, policies_removed))
-
+ self._policy_update.push_policy_update(policies_updated, policies_removed)
+ self._run.set()
def catch_up(self, audit=None):
"""need to bring the latest policies to DCAE-Controller"""
@@ -80,30 +146,24 @@ class PolicyUpdater(Thread):
"catch_up %s request_id %s",
self._aud_catch_up.req_message, self._aud_catch_up.request_id
)
-
- self.enqueue()
-
+ self._run.set()
def run(self):
"""wait and run the policy-update in thread"""
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
- queued_audit, policies_updated, policies_removed = self._queue.get()
- PolicyUpdater._logger.info(
- "got request_id %s policies_updated %s policies_removed %s",
- ((queued_audit and queued_audit.request_id) or "none"),
- json.dumps(policies_updated), json.dumps(policies_removed))
+ self._run.wait()
+
+ with self._lock:
+ self._run.clear()
if not self._keep_running():
break
if self._on_catch_up():
- self._reset_queue()
- continue
- elif not queued_audit:
continue
- self._on_policies_update(queued_audit, policies_updated, policies_removed)
+ self._on_policy_update()
PolicyUpdater._logger.info("exit policy-updater")
@@ -151,44 +211,13 @@ class PolicyUpdater(Thread):
self._catch_up_timer = None
self._logger.info("stopped catch_up_timer")
- def _need_to_send_catch_up(self, aud_catch_up, catch_up_message):
- """try not to send the duplicate messages on auto catchup unless hitting the max count"""
- if aud_catch_up.req_message != AUTO_CATCH_UP \
- or self._catch_up_skips >= self._catch_up_max_skips \
- or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message):
- self._catch_up_skips = 0
- self._catch_up_prev_message = copy.deepcopy(catch_up_message)
- log_line = "going to send the catch_up {0}: {1}".format(
- aud_catch_up.req_message,
- json.dumps(self._catch_up_prev_message)
- )
- self._logger.info(log_line)
- aud_catch_up.info(log_line)
- return True
-
- self._catch_up_skips += 1
- self._catch_up_prev_message = copy.deepcopy(catch_up_message)
- log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format(
- self._catch_up_skips, self._catch_up_max_skips,
- aud_catch_up.req_message, json.dumps(self._catch_up_prev_message)
- )
- self._logger.info(log_line)
- aud_catch_up.info(log_line)
- return False
-
- def _reset_queue(self):
- """clear up the queue"""
- with self._lock:
- if not self._aud_catch_up and not self._aud_shutdown:
- with self._queue.mutex:
- self._queue.queue.clear()
-
def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
with self._lock:
aud_catch_up = self._aud_catch_up
if self._aud_catch_up:
self._aud_catch_up = None
+ self._policy_update.reset()
if not aud_catch_up:
return False
@@ -196,20 +225,20 @@ class PolicyUpdater(Thread):
log_line = "catch_up {0} request_id {1}".format(
aud_catch_up.req_message, aud_catch_up.request_id
)
+ catch_up_result = ""
try:
PolicyUpdater._logger.info(log_line)
self._pause_catch_up_timer()
- catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
- catch_up_message[CATCH_UP] = True
+ _, catch_up_message = PolicyMatcher.get_latest_policies(aud_catch_up)
- catch_up_result = ""
- if not aud_catch_up.is_success():
+ if not catch_up_message or not aud_catch_up.is_success_or_not_found():
catch_up_result = "- not sending catch-up to deployment-handler due to errors"
PolicyUpdater._logger.warning(catch_up_result)
- elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message):
- catch_up_result = "- skipped sending the same policies"
+ elif catch_up_message.empty():
+ catch_up_result = "- not sending empty catch-up to deployment-handler"
else:
+ aud_catch_up.reset_http_status_not_found()
DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
if not aud_catch_up.is_success():
catch_up_result = "- failed to send catch-up to deployment-handler"
@@ -229,9 +258,6 @@ class PolicyUpdater(Thread):
aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
success = False
- if not success:
- self._catch_up_prev_message = None
-
self._run_catch_up_timer()
PolicyUpdater._logger.info("policy_handler health: %s",
@@ -240,49 +266,73 @@ class PolicyUpdater(Thread):
return success
- def _on_policies_update(self, queued_audit, policies_updated, policies_removed):
- """handle the event of policy-updates from the queue"""
- deployment_handler_changed = None
+ def _on_policy_update(self):
+ """handle the event of policy-updates"""
result = ""
+ with self._lock:
+ audit, policies_updated, policies_removed = self._policy_update.pop_policy_update()
+
+ if not audit:
+ return
log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
- ((queued_audit and queued_audit.request_id) or "none"),
- json.dumps(policies_updated), json.dumps(policies_removed))
+ audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed))
+ PolicyUpdater._logger.info(log_line)
try:
- updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
- (queued_audit, policies_updated, policies_removed))
-
- if not queued_audit.is_success():
+ (updated_policies, removed_policies,
+ policy_filter_matches) = PolicyMatcher.match_to_deployed_policies(
+ audit, policies_updated, policies_removed)
+
+ if updated_policies or removed_policies:
+ updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+ (audit,
+ [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION))
+ for policy_id, policy in updated_policies.items()],
+ [(policy_id, policy.get(POLICY_NAMES, {}))
+ for policy_id, policy in removed_policies.items()]
+ ))
+
+ if not audit.is_success_or_not_found():
result = "- not sending policy-updates to deployment-handler due to errors"
PolicyUpdater._logger.warning(result)
+ elif not updated_policies and not removed_policies:
+ result = "- not sending empty policy-updates to deployment-handler"
+ PolicyUpdater._logger.warning(result)
else:
- message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
- deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
- if not queued_audit.is_success():
- result = "- failed to send policy-updates to deployment-handler"
+ message = PolicyUpdateMessage(updated_policies, removed_policies,
+ policy_filter_matches, False)
+ log_updates = ("policies-updated[{0}], removed[{1}]"
+ .format(len(updated_policies), len(removed_policies)))
+
+ audit.reset_http_status_not_found()
+ DeployHandler.policy_update(audit, message)
+
+ if not audit.is_success():
+ result = "- failed to send to deployment-handler {}".format(log_updates)
PolicyUpdater._logger.warning(result)
else:
- result = "- sent policy-updates to deployment-handler"
+ result = "- sent to deployment-handler {}".format(log_updates)
+ log_line = "request_id: {} updated_policies: {} removed_policies: {}".format(
+ audit.request_id,
+ json.dumps(updated_policies), json.dumps(removed_policies))
- success, _, _ = queued_audit.audit_done(result=result)
+ audit.audit_done(result=result)
+ PolicyUpdater._logger.info(log_line + " " + result)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
- .format(queued_audit.request_id, type(ex).__name__, str(ex),
+ .format(audit.request_id, type(ex).__name__, str(ex),
"on_policies_update", log_line + " " + result))
PolicyUpdater._logger.exception(error_msg)
- queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
- queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- success = False
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- if deployment_handler_changed:
- self._catch_up_prev_message = None
+ if DeployHandler.server_instance_changed:
+ DeployHandler.server_instance_changed = False
self._pause_catch_up_timer()
self.catch_up()
- elif not success:
- self._catch_up_prev_message = None
def shutdown(self, audit):
@@ -290,7 +340,7 @@ class PolicyUpdater(Thread):
PolicyUpdater._logger.info("shutdown policy-updater")
with self._lock:
self._aud_shutdown = audit
- self.enqueue()
+ self._run.set()
self._stop_catch_up_timer()