aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_updater.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r--policyhandler/policy_updater.py296
1 files changed, 235 insertions, 61 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 1f1539f..5ba7c29 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -1,8 +1,5 @@
-"""policy-updater thread"""
-
-# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,13 +16,23 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import logging
+"""policy-updater thread"""
+
+import copy
import json
-from Queue import Queue
-from threading import Thread, Lock
+import logging
+from queue import Queue
+from threading import Lock, Thread
-from .policy_rest import PolicyRest
+from .config import Config
from .deploy_handler import DeployHandler
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
+from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
+ REMOVED_POLICIES)
+from .policy_rest import PolicyRest
+from .policy_utils import Utils
+from .step_timer import StepTimer
+
class PolicyUpdater(Thread):
"""queue and handle the policy-updates in a separate thread"""
@@ -33,92 +40,259 @@ class PolicyUpdater(Thread):
def __init__(self):
"""init static config of PolicyUpdater."""
- Thread.__init__(self)
- self.name = "policy_updater"
+ Thread.__init__(self, name="policy_updater")
self.daemon = True
- self._req_shutdown = None
- self._req_catch_up = None
+ self._catch_up_timer = None
+ self._aud_shutdown = None
+ self._aud_catch_up = None
+
+ catch_up_config = Config.settings.get(CATCH_UP, {})
+ self._catch_up_interval = catch_up_config.get("interval") or 15*60
+ self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
+ self._catch_up_skips = 0
+ self._catch_up_prev_message = None
self._lock = Lock()
self._queue = Queue()
- def enqueue(self, audit=None, policy_names=None):
- """enqueue the policy-names"""
- policy_names = policy_names or []
- PolicyUpdater._logger.info("policy_names %s", json.dumps(policy_names))
- self._queue.put((audit, policy_names))
+
+ def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
+ """enqueue the policy-updates"""
+ policies_updated = policies_updated or []
+ policies_removed = policies_removed or []
+
+ PolicyUpdater._logger.info(
+ "enqueue request_id %s policies_updated %s policies_removed %s",
+ ((audit and audit.request_id) or "none"),
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
+ with self._lock:
+ self._queue.put((audit, policies_updated, policies_removed))
+
+
+ def catch_up(self, audit=None):
+ """need to bring the latest policies to DCAE-Controller"""
+ with self._lock:
+ if not self._aud_catch_up:
+ self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
+ PolicyUpdater._logger.info(
+ "catch_up %s request_id %s",
+ self._aud_catch_up.req_message, self._aud_catch_up.request_id
+ )
+
+ self.enqueue()
+
def run(self):
"""wait and run the policy-update in thread"""
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
- audit, policy_names = self._queue.get()
- PolicyUpdater._logger.info("got policy-updates %s", json.dumps(policy_names))
+ queued_audit, policies_updated, policies_removed = self._queue.get()
+ PolicyUpdater._logger.info(
+ "got request_id %s policies_updated %s policies_removed %s",
+ ((queued_audit and queued_audit.request_id) or "none"),
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
if not self._keep_running():
- self._queue.task_done()
break
+
if self._on_catch_up():
+ self._reset_queue()
continue
-
- if not policy_names:
- self._queue.task_done()
+ elif not queued_audit:
continue
- updated_policies = PolicyRest.get_latest_policies_by_names((audit, policy_names))
- PolicyUpdater.policy_update(audit, updated_policies)
- audit.audit_done()
- self._queue.task_done()
+ self._on_policies_update(queued_audit, policies_updated, policies_removed)
PolicyUpdater._logger.info("exit policy-updater")
def _keep_running(self):
"""thread-safe check whether to continue running"""
- self._lock.acquire()
- keep_running = not self._req_shutdown
- self._lock.release()
- if self._req_shutdown:
- self._req_shutdown.audit_done()
+ with self._lock:
+ keep_running = not self._aud_shutdown
+
+ if self._aud_shutdown:
+ self._aud_shutdown.audit_done()
return keep_running
- def catch_up(self, audit):
- """need to bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- self._req_catch_up = audit
- self._lock.release()
- self.enqueue()
+ def _run_catch_up_timer(self):
+ """create and start the catch_up timer"""
+ if not self._catch_up_interval:
+ return
+
+ if self._catch_up_timer:
+ self._logger.info("next step catch_up_timer in %s", self._catch_up_interval)
+ self._catch_up_timer.next()
+ return
+
+ self._catch_up_timer = StepTimer(
+ "catch_up_timer",
+ self._catch_up_interval,
+ PolicyUpdater.catch_up,
+ PolicyUpdater._logger,
+ self
+ )
+ self._logger.info("started catch_up_timer in %s", self._catch_up_interval)
+ self._catch_up_timer.start()
+
+ def _pause_catch_up_timer(self):
+ """pause catch_up_timer"""
+ if self._catch_up_timer:
+ self._logger.info("pause catch_up_timer")
+ self._catch_up_timer.pause()
+
+ def _stop_catch_up_timer(self):
+ """stop and destroy the catch_up_timer"""
+ if self._catch_up_timer:
+ self._logger.info("stopping catch_up_timer")
+ self._catch_up_timer.stop()
+ self._catch_up_timer.join()
+ self._catch_up_timer = None
+ self._logger.info("stopped catch_up_timer")
+
+ def _need_to_send_catch_up(self, aud_catch_up, catch_up_message):
+ """try not to send the duplicate messages on auto catchup unless hitting the max count"""
+ if aud_catch_up.req_message != AUTO_CATCH_UP \
+ or self._catch_up_skips >= self._catch_up_max_skips \
+ or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message):
+ self._catch_up_skips = 0
+ self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ log_line = "going to send the catch_up {0}: {1}".format(
+ aud_catch_up.req_message,
+ json.dumps(self._catch_up_prev_message)
+ )
+ self._logger.info(log_line)
+ aud_catch_up.info(log_line)
+ return True
+
+ self._catch_up_skips += 1
+ self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ log_line = "skip {0}/{1} sending the same catch_up {2}: {3}".format(
+ self._catch_up_skips, self._catch_up_max_skips,
+ aud_catch_up.req_message, json.dumps(self._catch_up_prev_message)
+ )
+ self._logger.info(log_line)
+ aud_catch_up.info(log_line)
+ return False
+
+ def _reset_queue(self):
+ """clear up the queue"""
+ with self._lock:
+ if not self._aud_catch_up and not self._aud_shutdown:
+ with self._queue.mutex:
+ self._queue.queue.clear()
def _on_catch_up(self):
- """Bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- req_catch_up = self._req_catch_up
- if self._req_catch_up:
- self._req_catch_up = None
- self._queue.task_done()
- self._queue = Queue()
- self._lock.release()
- if not req_catch_up:
+ """bring all the latest policies to DCAE-Controller"""
+ with self._lock:
+ aud_catch_up = self._aud_catch_up
+ if self._aud_catch_up:
+ self._aud_catch_up = None
+
+ if not aud_catch_up:
return False
- PolicyUpdater._logger.info("catch_up")
- latest_policies = PolicyRest.get_latest_policies(req_catch_up)
- PolicyUpdater.policy_update(req_catch_up, latest_policies)
- req_catch_up.audit_done()
- return True
+ log_line = "catch_up {0} request_id {1}".format(
+ aud_catch_up.req_message, aud_catch_up.request_id
+ )
+ try:
+ PolicyUpdater._logger.info(log_line)
+ self._pause_catch_up_timer()
+
+ catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
+ catch_up_message[CATCH_UP] = True
+
+ catch_up_result = ""
+ if not aud_catch_up.is_success():
+ catch_up_result = "- not sending catch-up to deployment-handler due to errors"
+ PolicyUpdater._logger.warning(catch_up_result)
+ elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message):
+ catch_up_result = "- skipped sending the same policies"
+ else:
+ DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
+ if not aud_catch_up.is_success():
+ catch_up_result = "- failed to send catch-up to deployment-handler"
+ PolicyUpdater._logger.warning(catch_up_result)
+ else:
+ catch_up_result = "- sent catch-up to deployment-handler"
+ success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
+ PolicyUpdater._logger.info(log_line + " " + catch_up_result)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(aud_catch_up.request_id, type(ex).__name__, str(ex),
+ "on_catch_up", log_line + " " + catch_up_result))
+
+ PolicyUpdater._logger.exception(error_msg)
+ aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ success = False
+
+ if not success:
+ self._catch_up_prev_message = None
+
+ self._run_catch_up_timer()
+
+ PolicyUpdater._logger.info("policy_handler health: %s",
+ json.dumps(aud_catch_up.health(full=True)))
+ PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info()))
+ return success
+
+
+ def _on_policies_update(self, queued_audit, policies_updated, policies_removed):
+ """handle the event of policy-updates from the queue"""
+ deployment_handler_changed = None
+ result = ""
+
+ log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
+ ((queued_audit and queued_audit.request_id) or "none"),
+ json.dumps(policies_updated), json.dumps(policies_removed))
+
+ try:
+ updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+ (queued_audit, policies_updated, policies_removed))
+
+ if not queued_audit.is_success():
+ result = "- not sending policy-updates to deployment-handler due to errors"
+ PolicyUpdater._logger.warning(result)
+ else:
+ message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
+ deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
+ if not queued_audit.is_success():
+ result = "- failed to send policy-updates to deployment-handler"
+ PolicyUpdater._logger.warning(result)
+ else:
+ result = "- sent policy-updates to deployment-handler"
+
+ success, _, _ = queued_audit.audit_done(result=result)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(queued_audit.request_id, type(ex).__name__, str(ex),
+ "on_policies_update", log_line + " " + result))
+
+ PolicyUpdater._logger.exception(error_msg)
+ queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ success = False
+
+ if deployment_handler_changed:
+ self._catch_up_prev_message = None
+ self._pause_catch_up_timer()
+ self.catch_up()
+ elif not success:
+ self._catch_up_prev_message = None
- @staticmethod
- def policy_update(audit, updated_policies):
- """Invoke deploy-handler"""
- if updated_policies:
- PolicyUpdater._logger.info("updated_policies %s", json.dumps(updated_policies))
- DeployHandler.policy_update(audit, updated_policies)
def shutdown(self, audit):
"""Shutdown the policy-updater"""
PolicyUpdater._logger.info("shutdown policy-updater")
- self._lock.acquire()
- self._req_shutdown = audit
- self._lock.release()
+ with self._lock:
+ self._aud_shutdown = audit
self.enqueue()
+
+ self._stop_catch_up_timer()
+
if self.is_alive():
self.join()