aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-03-13 17:50:51 -0400
committerAlex Shatov <alexs@att.com>2018-03-13 17:50:51 -0400
commita2b262994a62286e1c98fd9939115c6e64ed27ee (patch)
treef6ef7056c70a34485d1ed047a296b89bb3354ee2 /policyhandler
parentb9b955c0c2875effc1ce02d5576f0d2a59284c5d (diff)
2.3.0 policy-handler - periodically catch_up
- periodically catchup - interval is configurable = max_skips defines the number of times the catch_up message that is identical to previous one can be skipped - do not catchup more often than the interval even between the manual catchup and auto catchup - do not send the same catchup message twice in a row to the deployment-handler but not exceed a hard limit on catchup max_skips - catchup if the deployment-handler instance is changed Change-Id: I9a3fcc941e8a9e553abb3952dd882c37e0f5fdfe Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-389
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/deploy_handler.py31
-rw-r--r--policyhandler/policy_consts.py1
-rw-r--r--policyhandler/policy_updater.py118
-rw-r--r--policyhandler/policy_utils.py69
-rw-r--r--policyhandler/step_timer.py63
5 files changed, 243 insertions, 39 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 0950dfd..493b7d5 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -41,6 +41,7 @@ class DeployHandler(object):
_url_path = None
_target_entity = None
_custom_kwargs = None
+ _server_instance_uuid = None
@staticmethod
def _lazy_init(audit):
@@ -72,7 +73,11 @@ class DeployHandler(object):
@staticmethod
def policy_update(audit, message):
- """post policy_updated message to deploy-handler"""
+ """
+ post policy_updated message to deploy-handler
+
+ returns condition whether it needs to catch_up
+ """
if not message:
return
@@ -116,8 +121,24 @@ class DeployHandler(object):
sub_aud.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
- sub_aud.metrics("response {0} from {1}: text={2}{3}" \
- .format(res.status_code, log_action, res.text, log_data))
+ log_line = "response {0} from {1}: text={2}{3}" \
+ .format(res.status_code, log_action, res.text, log_data)
+ sub_aud.metrics(log_line)
+ DeployHandler._logger.info(log_line)
+
+ if res.status_code != requests.codes.ok:
+ return
+
+ result = res.json() or {}
+ prev_server_instance_uuid = DeployHandler._server_instance_uuid
+ DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
+
+ need_to_catch_up = (prev_server_instance_uuid
+ and prev_server_instance_uuid != DeployHandler._server_instance_uuid)
+ if need_to_catch_up:
+ log_line = "need_to_catch_up: {1} != {0}" \
+ .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)
+ sub_aud.info(log_line)
+ DeployHandler._logger.info(log_line)
- if res.status_code == requests.codes.ok:
- return res.json()
+ return need_to_catch_up
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index 013ed2a..90ede47 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -25,6 +25,7 @@ POLICY_BODY = 'policy_body'
POLICY_CONFIG = 'config'
CATCH_UP = "catch_up"
+AUTO_CATCH_UP = "auto catch_up"
LATEST_POLICIES = "latest_policies"
REMOVED_POLICIES = "removed_policies"
ERRORED_POLICIES = "errored_policies"
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 01be550..e12af88 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -18,15 +18,21 @@
"""policy-updater thread"""
+import copy
import json
import logging
from Queue import Queue
from threading import Lock, Thread
+from .config import Config
from .deploy_handler import DeployHandler
from .onap.audit import Audit
-from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES
+from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
+ REMOVED_POLICIES)
from .policy_rest import PolicyRest
+from .policy_utils import Utils
+from .step_timer import StepTimer
+
class PolicyUpdater(Thread):
"""queue and handle the policy-updates in a separate thread"""
@@ -37,9 +43,17 @@ class PolicyUpdater(Thread):
Thread.__init__(self, name="policy_updater")
self.daemon = True
+ self._catch_up_timer = None
self._aud_shutdown = None
self._aud_catch_up = None
+ catch_up_config = Config.config.get("catch_up", {})
+ self._catch_up_interval = catch_up_config.get("interval") or 15*60
+ self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
+ self._catch_up_skips = 0
+ self._catch_up_prev_message = None
+ self._need_to_catch_up = False
+
self._lock = Lock()
self._queue = Queue()
@@ -57,7 +71,7 @@ class PolicyUpdater(Thread):
"""wait and run the policy-update in thread"""
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
- audit, policies_updated, policies_removed = self._queue.get()
+ queued_audit, policies_updated, policies_removed = self._queue.get()
PolicyUpdater._logger.info(
"got policies_updated %s policies_removed %s",
json.dumps(policies_updated), json.dumps(policies_removed))
@@ -66,17 +80,22 @@ class PolicyUpdater(Thread):
self._queue.task_done()
break
- if self._on_catch_up(audit) or not audit:
+ if self._on_catch_up(queued_audit) or not queued_audit:
continue
updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
- (audit, policies_updated, policies_removed))
+ (queued_audit, policies_updated, policies_removed))
message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
- DeployHandler.policy_update(audit, message)
- audit.audit_done()
+ self._need_to_catch_up = DeployHandler.policy_update(queued_audit, message)
+
+ queued_audit.audit_done()
self._queue.task_done()
+ if self._need_to_catch_up:
+ self._pause_catch_up_timer()
+ self.catch_up()
+
PolicyUpdater._logger.info("exit policy-updater")
def _keep_running(self):
@@ -88,14 +107,68 @@ class PolicyUpdater(Thread):
self._aud_shutdown.audit_done()
return keep_running
- def catch_up(self, audit):
+ def catch_up(self, audit=None):
"""need to bring the latest policies to DCAE-Controller"""
PolicyUpdater._logger.info("catch_up requested")
with self._lock:
- self._aud_catch_up = audit
+ self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
self.enqueue()
+ def _run_catch_up_timer(self):
+ """create and start the catch_up timer"""
+ if not self._catch_up_interval:
+ return
+
+ if self._catch_up_timer:
+ self._catch_up_timer.next()
+ self._logger.info("next step catch_up_timer in %s", self._catch_up_interval)
+ return
+
+ self._catch_up_timer = StepTimer(
+ "catch_up_timer",
+ self._catch_up_interval,
+ PolicyUpdater.catch_up,
+ self
+ )
+ self._catch_up_timer.start()
+ self._logger.info("started catch_up_timer in %s", self._catch_up_interval)
+
+ def _pause_catch_up_timer(self):
+ """stop catch_up_timer"""
+ if self._catch_up_timer:
+ self._logger.info("pause catch_up_timer")
+ self._catch_up_timer.pause()
+
+ def _stop_catch_up_timer(self):
+ """stop catch_up_timer"""
+ if self._catch_up_timer:
+ self._logger.info("stopping catch_up_timer")
+ self._catch_up_timer.stop()
+ self._catch_up_timer.join()
+ self._catch_up_timer = None
+ self._logger.info("stopped catch_up_timer")
+
+ def _need_to_send_catch_up(self, aud_catch_up, catch_up_message):
+ """try not to send the duplicate messages on auto catchup unless hitting the max count"""
+ if self._need_to_catch_up \
+ or aud_catch_up.req_message != AUTO_CATCH_UP \
+ or self._catch_up_skips >= self._catch_up_max_skips \
+ or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message):
+ self._catch_up_skips = 0
+ self._need_to_catch_up = False
+ self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ return True
+
+ self._catch_up_skips += 1
+ self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ log_line = "skip {0} sending the same catch_up: {1}".format(
+ self._catch_up_skips, self._catch_up_prev_message
+ )
+ self._logger.info(log_line)
+ aud_catch_up.info(log_line)
+ return False
+
def _reset_queue(self):
"""clear up the queue"""
with self._lock:
@@ -103,32 +176,34 @@ class PolicyUpdater(Thread):
self._queue.task_done()
self._queue = Queue()
- def _on_catch_up(self, audit):
- """Bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- aud_catch_up = self._aud_catch_up
- if self._aud_catch_up:
- self._aud_catch_up = None
- self._lock.release()
+ def _on_catch_up(self, queued_audit):
+ """bring all the latest policies to DCAE-Controller"""
+ with self._lock:
+ aud_catch_up = self._aud_catch_up
+ if self._aud_catch_up:
+ self._aud_catch_up = None
if not aud_catch_up:
return False
PolicyUpdater._logger.info("catch_up")
+ self._pause_catch_up_timer()
- result = PolicyRest.get_latest_policies(aud_catch_up)
- result[CATCH_UP] = True
+ catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
+ catch_up_message[CATCH_UP] = True
if not aud_catch_up.is_success():
PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors")
- if not audit:
+ if not queued_audit:
self._queue.task_done()
- else:
- DeployHandler.policy_update(aud_catch_up, result)
+ elif self._need_to_send_catch_up(aud_catch_up, catch_up_message):
+ DeployHandler.policy_update(aud_catch_up, catch_up_message)
self._reset_queue()
success, _, _ = aud_catch_up.audit_done()
PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+ self._run_catch_up_timer()
+
return success
def shutdown(self, audit):
@@ -137,5 +212,8 @@ class PolicyUpdater(Thread):
with self._lock:
self._aud_shutdown = audit
self.enqueue()
+
+ self._stop_catch_up_timer()
+
if self.is_alive():
self.join()
diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py
index 652f98b..69978b6 100644
--- a/policyhandler/policy_utils.py
+++ b/policyhandler/policy_utils.py
@@ -18,11 +18,12 @@
"""policy-client communicates with policy-engine thru REST API"""
-import logging
import json
+import logging
import re
-from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_BODY, POLICY_CONFIG
+from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME,
+ POLICY_VERSION)
class PolicyUtils(object):
"""policy-client utils"""
@@ -30,17 +31,6 @@ class PolicyUtils(object):
_policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
@staticmethod
- def safe_json_parse(json_str):
- """try parsing json without exception - returns the json_str back if fails"""
- if not json_str:
- return json_str
- try:
- return json.loads(json_str)
- except (ValueError, TypeError) as err:
- PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
- return json_str
-
- @staticmethod
def extract_policy_id(policy_name):
""" policy_name = policy_id + "." + <version> + "." + <extension>
For instance,
@@ -65,7 +55,7 @@ class PolicyUtils(object):
return policy
config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
if config:
- policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(config)
+ policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
return policy
@staticmethod
@@ -131,3 +121,54 @@ class PolicyUtils(object):
policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
return policies
+
+class Utils(object):
+ """general purpose utils"""
+ _logger = logging.getLogger("policy_handler.utils")
+
+ @staticmethod
+ def safe_json_parse(json_str):
+ """try parsing json without exception - returns the json_str back if fails"""
+ if not json_str:
+ return json_str
+ try:
+ return json.loads(json_str)
+ except (ValueError, TypeError) as err:
+ Utils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
+ return json_str
+
+ @staticmethod
+ def are_the_same(body_1, body_2):
+ """check whether both objects are the same"""
+ if (body_1 and not body_2) or (not body_1 and body_2):
+ Utils._logger.debug("only one is empty %s != %s", body_1, body_2)
+ return False
+
+ if body_1 is None and body_2 is None:
+ return True
+
+ if isinstance(body_1, list) and isinstance(body_2, list):
+ if len(body_1) != len(body_2):
+ Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2))
+ return False
+
+ for val_1, val_2 in zip(body_1, body_2):
+ if not Utils.are_the_same(val_1, val_2):
+ return False
+ return True
+
+ if isinstance(body_1, dict) and isinstance(body_2, dict):
+ if body_1.viewkeys() ^ body_2.viewkeys():
+ Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2))
+ return False
+
+ for key, val_1 in body_1.iteritems():
+ if not Utils.are_the_same(val_1, body_2[key]):
+ return False
+ return True
+
+ # ... here when primitive values or mismatched types ...
+ the_same_values = (body_1 == body_2)
+ if not the_same_values:
+ Utils._logger.debug("values %s != %s", body_1, body_2)
+ return the_same_values
diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py
new file mode 100644
index 0000000..6e2b3f6
--- /dev/null
+++ b/policyhandler/step_timer.py
@@ -0,0 +1,63 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""periodically callback"""
+
+from threading import Event, Thread
+
+
+class StepTimer(Thread):
+ """call on_time after interval number of seconds, then wait to continue"""
+ def __init__(self, name, interval, on_time, *args, **kwargs):
+ Thread.__init__(self, name=name)
+ self._interval = interval
+ self._on_time = on_time
+ self._args = args
+ self._kwargs = kwargs
+
+ self._timeout = Event()
+ self._paused = Event()
+ self._continue = Event()
+ self._finished = Event()
+
+ def next(self):
+ """continue with the next timeout"""
+ self._paused.clear()
+ self._continue.set()
+
+ def pause(self):
+ """pause the timer"""
+ self._paused.set()
+
+ def stop(self):
+ """stop the timer if it hasn't finished yet"""
+ self._finished.set()
+ self._timeout.set()
+ self._continue.set()
+
+ def run(self):
+ """loop until stopped=finished"""
+ while True:
+ self._timeout.wait(self._interval)
+ if self._finished.is_set():
+ break
+ self._timeout.clear()
+ self._continue.clear()
+ if not self._paused.is_set():
+ self._on_time(*self._args, **self._kwargs)
+ self._continue.wait()