aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_updater.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/policy_updater.py')
-rw-r--r--policyhandler/policy_updater.py144
1 files changed, 133 insertions, 11 deletions
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 7733146..8909cc7 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -22,7 +22,7 @@ import json
import logging
from threading import Event, Lock, Thread
-from .config import Config
+from .config import Config, Settings
from .deploy_handler import DeployHandler, PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID,
@@ -115,21 +115,45 @@ class PolicyUpdater(Thread):
"""sequentially handle the policy-updates and catch-ups in its own policy_updater thread"""
_logger = logging.getLogger("policy_handler.policy_updater")
- def __init__(self):
+ def __init__(self, on_reconfigure_receiver):
"""init static config of PolicyUpdater."""
- Thread.__init__(self, name="policy_updater")
- self.daemon = True
+ Thread.__init__(self, name="policy_updater", daemon=True)
+ self._reconfigure_receiver = on_reconfigure_receiver
self._lock = Lock()
self._run = Event()
+ self._settings = Settings(CATCH_UP, Config.RECONFIGURE)
self._catch_up_timer = None
+ self._reconfigure_timer = None
+
self._aud_shutdown = None
self._aud_catch_up = None
+ self._aud_reconfigure = None
self._policy_update = _PolicyUpdate()
- catch_up_config = Config.settings.get(CATCH_UP, {})
- self._catch_up_interval = catch_up_config.get("interval") or 15*60
+ self._catch_up_interval = None
+ self._reconfigure_interval = None
+ self._set_timer_intervals()
+
+ def _set_timer_intervals(self):
+ """set intervals on timers"""
+ self._settings.set_config(Config.discovered_config)
+ if not self._settings.is_changed():
+ self._settings.commit_change()
+ return False
+
+ _, catch_up = self._settings.get_by_key(CATCH_UP, {})
+ self._catch_up_interval = catch_up.get(Config.TIMER_INTERVAL) or 15*60
+
+ _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {})
+ self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60
+
+ PolicyUpdater._logger.info(
+ "intervals: catch_up(%s) reconfigure(%s): %s",
+ self._catch_up_interval, self._reconfigure_interval, self._settings)
+ self._settings.commit_change()
+ return True
def policy_update(self, policies_updated, policies_removed):
"""enqueue the policy-updates"""
@@ -148,8 +172,20 @@ class PolicyUpdater(Thread):
)
self._run.set()
+ def _reconfigure(self):
+ """job to check for and bring in the updated config for policy-handler"""
+ with self._lock:
+ if not self._aud_reconfigure:
+ self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE)
+ PolicyUpdater._logger.info(
+ "reconfigure %s request_id %s",
+ self._aud_reconfigure.req_message, self._aud_reconfigure.request_id
+ )
+ self._run.set()
+
def run(self):
"""wait and run the policy-update in thread"""
+ self._run_reconfigure_timer()
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
self._run.wait()
@@ -160,6 +196,11 @@ class PolicyUpdater(Thread):
if not self._keep_running():
break
+ self._on_reconfigure()
+
+ if not self._keep_running():
+ break
+
if self._on_catch_up():
continue
@@ -183,7 +224,7 @@ class PolicyUpdater(Thread):
if self._catch_up_timer:
self._logger.info("next step catch_up_timer in %s", self._catch_up_interval)
- self._catch_up_timer.next()
+ self._catch_up_timer.next(self._catch_up_interval)
return
self._catch_up_timer = StepTimer(
@@ -196,14 +237,34 @@ class PolicyUpdater(Thread):
self._logger.info("started catch_up_timer in %s", self._catch_up_interval)
self._catch_up_timer.start()
+ def _run_reconfigure_timer(self):
+ """create and start the reconfigure timer"""
+ if not self._reconfigure_interval:
+ return
+
+ if self._reconfigure_timer:
+ self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval)
+ self._reconfigure_timer.next(self._reconfigure_interval)
+ return
+
+ self._reconfigure_timer = StepTimer(
+ "reconfigure_timer",
+ self._reconfigure_interval,
+ PolicyUpdater._reconfigure,
+ PolicyUpdater._logger,
+ self
+ )
+ self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval)
+ self._reconfigure_timer.start()
+
def _pause_catch_up_timer(self):
"""pause catch_up_timer"""
if self._catch_up_timer:
self._logger.info("pause catch_up_timer")
self._catch_up_timer.pause()
- def _stop_catch_up_timer(self):
- """stop and destroy the catch_up_timer"""
+ def _stop_timers(self):
+ """stop and destroy the catch_up and reconfigure timers"""
if self._catch_up_timer:
self._logger.info("stopping catch_up_timer")
self._catch_up_timer.stop()
@@ -211,6 +272,66 @@ class PolicyUpdater(Thread):
self._catch_up_timer = None
self._logger.info("stopped catch_up_timer")
+ if self._reconfigure_timer:
+ self._logger.info("stopping reconfigure_timer")
+ self._reconfigure_timer.stop()
+ self._reconfigure_timer.join()
+ self._reconfigure_timer = None
+ self._logger.info("stopped reconfigure_timer")
+
+ def _on_reconfigure(self):
+ """bring the latest config and reconfigure"""
+ with self._lock:
+ aud_reconfigure = self._aud_reconfigure
+ if self._aud_reconfigure:
+ self._aud_reconfigure = None
+
+ if not aud_reconfigure:
+ return
+
+ log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id)
+ reconfigure_result = ""
+ try:
+ PolicyUpdater._logger.info(log_line)
+ Config.discover(aud_reconfigure)
+ if not Config.discovered_config.is_changed():
+ reconfigure_result = " -- config not changed"
+ else:
+ reconfigure_result = " -- config changed for:"
+ if self._set_timer_intervals():
+ reconfigure_result += " timer_intervals"
+
+ if PolicyRest.reconfigure():
+ reconfigure_result += " " + Config.FIELD_POLICY_ENGINE
+
+ if DeployHandler.reconfigure(aud_reconfigure):
+ reconfigure_result += " " + Config.DEPLOY_HANDLER
+
+ if self._reconfigure_receiver():
+ reconfigure_result += " web-socket"
+
+ reconfigure_result += " -- change: {}".format(Config.discovered_config)
+
+ Config.discovered_config.commit_change()
+ aud_reconfigure.audit_done(result=reconfigure_result)
+ PolicyUpdater._logger.info(log_line + reconfigure_result)
+
+ except Exception as ex:
+ error_msg = "crash {} {}{}: {}: {}".format(
+ "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex))
+
+ PolicyUpdater._logger.exception(error_msg)
+ aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ aud_reconfigure.audit_done(result=error_msg)
+
+ self._run_reconfigure_timer()
+
+ PolicyUpdater._logger.info("policy_handler health: %s",
+ json.dumps(aud_reconfigure.health(full=True)))
+ PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info()))
+
+
def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
with self._lock:
@@ -239,7 +360,7 @@ class PolicyUpdater(Thread):
catch_up_result = "- not sending empty catch-up to deployment-handler"
else:
aud_catch_up.reset_http_status_not_found()
- DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
+ DeployHandler.policy_update(aud_catch_up, catch_up_message)
if not aud_catch_up.is_success():
catch_up_result = "- failed to send catch-up to deployment-handler"
PolicyUpdater._logger.warning(catch_up_result)
@@ -256,6 +377,7 @@ class PolicyUpdater(Thread):
PolicyUpdater._logger.exception(error_msg)
aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ aud_catch_up.audit_done(result=error_msg)
success = False
self._run_catch_up_timer()
@@ -342,7 +464,7 @@ class PolicyUpdater(Thread):
self._aud_shutdown = audit
self._run.set()
- self._stop_catch_up_timer()
+ self._stop_timers()
if self.is_alive():
self.join()