aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-03-13 17:50:51 -0400
committerAlex Shatov <alexs@att.com>2018-03-13 17:50:51 -0400
commita2b262994a62286e1c98fd9939115c6e64ed27ee (patch)
treef6ef7056c70a34485d1ed047a296b89bb3354ee2
parentb9b955c0c2875effc1ce02d5576f0d2a59284c5d (diff)
2.3.0 policy-handler - periodically catch_up
- periodically catchup - interval is configurable = max_skips defines the number of times the catch_up message that is identical to previous one can be skipped - do not catchup more often than the interval even between the manual catchup and auto catchup - do not send the same catchup message twice in a row to the deployment-handler but not exceed a hard limit on catchup max_skips - catchup if the deployment-handler instance is changed Change-Id: I9a3fcc941e8a9e553abb3952dd882c37e0f5fdfe Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-389
-rw-r--r--etc_upload/config.json4
-rw-r--r--policyhandler/deploy_handler.py31
-rw-r--r--policyhandler/policy_consts.py1
-rw-r--r--policyhandler/policy_updater.py118
-rw-r--r--policyhandler/policy_utils.py69
-rw-r--r--policyhandler/step_timer.py63
-rw-r--r--pom.xml2
-rw-r--r--setup.py2
-rw-r--r--tests/test_policyhandler.py39
-rw-r--r--version.properties2
10 files changed, 255 insertions, 76 deletions
diff --git a/etc_upload/config.json b/etc_upload/config.json
index e143773..920ef7c 100644
--- a/etc_upload/config.json
+++ b/etc_upload/config.json
@@ -6,6 +6,10 @@
"scope_prefixes" : ["DCAE_alex.Config_", "DCAE.Config_"],
"policy_retry_count" : 5,
"policy_retry_sleep" : 5,
+ "catch_up" : {
+ "interval" : 1200,
+ "max_skips" : 5
+ },
"policy_engine" : {
"url" : "https://peawiv9nspd01.pedc.sbc.com:8081",
"path_pdp" :"/pdp/",
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 0950dfd..493b7d5 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -41,6 +41,7 @@ class DeployHandler(object):
_url_path = None
_target_entity = None
_custom_kwargs = None
+ _server_instance_uuid = None
@staticmethod
def _lazy_init(audit):
@@ -72,7 +73,11 @@ class DeployHandler(object):
@staticmethod
def policy_update(audit, message):
- """post policy_updated message to deploy-handler"""
+ """
+ post policy_updated message to deploy-handler
+
+ returns condition whether it needs to catch_up
+ """
if not message:
return
@@ -116,8 +121,24 @@ class DeployHandler(object):
sub_aud.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
- sub_aud.metrics("response {0} from {1}: text={2}{3}" \
- .format(res.status_code, log_action, res.text, log_data))
+ log_line = "response {0} from {1}: text={2}{3}" \
+ .format(res.status_code, log_action, res.text, log_data)
+ sub_aud.metrics(log_line)
+ DeployHandler._logger.info(log_line)
+
+ if res.status_code != requests.codes.ok:
+ return
+
+ result = res.json() or {}
+ prev_server_instance_uuid = DeployHandler._server_instance_uuid
+ DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
+
+ need_to_catch_up = (prev_server_instance_uuid
+ and prev_server_instance_uuid != DeployHandler._server_instance_uuid)
+ if need_to_catch_up:
+ log_line = "need_to_catch_up: {1} != {0}" \
+ .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)
+ sub_aud.info(log_line)
+ DeployHandler._logger.info(log_line)
- if res.status_code == requests.codes.ok:
- return res.json()
+ return need_to_catch_up
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index 013ed2a..90ede47 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -25,6 +25,7 @@ POLICY_BODY = 'policy_body'
POLICY_CONFIG = 'config'
CATCH_UP = "catch_up"
+AUTO_CATCH_UP = "auto catch_up"
LATEST_POLICIES = "latest_policies"
REMOVED_POLICIES = "removed_policies"
ERRORED_POLICIES = "errored_policies"
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 01be550..e12af88 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -18,15 +18,21 @@
"""policy-updater thread"""
+import copy
import json
import logging
from Queue import Queue
from threading import Lock, Thread
+from .config import Config
from .deploy_handler import DeployHandler
from .onap.audit import Audit
-from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES
+from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
+ REMOVED_POLICIES)
from .policy_rest import PolicyRest
+from .policy_utils import Utils
+from .step_timer import StepTimer
+
class PolicyUpdater(Thread):
"""queue and handle the policy-updates in a separate thread"""
@@ -37,9 +43,17 @@ class PolicyUpdater(Thread):
Thread.__init__(self, name="policy_updater")
self.daemon = True
+ self._catch_up_timer = None
self._aud_shutdown = None
self._aud_catch_up = None
+ catch_up_config = Config.config.get("catch_up", {})
+ self._catch_up_interval = catch_up_config.get("interval") or 15*60
+ self._catch_up_max_skips = catch_up_config.get("max_skips") or 3
+ self._catch_up_skips = 0
+ self._catch_up_prev_message = None
+ self._need_to_catch_up = False
+
self._lock = Lock()
self._queue = Queue()
@@ -57,7 +71,7 @@ class PolicyUpdater(Thread):
"""wait and run the policy-update in thread"""
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
- audit, policies_updated, policies_removed = self._queue.get()
+ queued_audit, policies_updated, policies_removed = self._queue.get()
PolicyUpdater._logger.info(
"got policies_updated %s policies_removed %s",
json.dumps(policies_updated), json.dumps(policies_removed))
@@ -66,17 +80,22 @@ class PolicyUpdater(Thread):
self._queue.task_done()
break
- if self._on_catch_up(audit) or not audit:
+ if self._on_catch_up(queued_audit) or not queued_audit:
continue
updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
- (audit, policies_updated, policies_removed))
+ (queued_audit, policies_updated, policies_removed))
message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
- DeployHandler.policy_update(audit, message)
- audit.audit_done()
+ self._need_to_catch_up = DeployHandler.policy_update(queued_audit, message)
+
+ queued_audit.audit_done()
self._queue.task_done()
+ if self._need_to_catch_up:
+ self._pause_catch_up_timer()
+ self.catch_up()
+
PolicyUpdater._logger.info("exit policy-updater")
def _keep_running(self):
@@ -88,14 +107,68 @@ class PolicyUpdater(Thread):
self._aud_shutdown.audit_done()
return keep_running
- def catch_up(self, audit):
+ def catch_up(self, audit=None):
"""need to bring the latest policies to DCAE-Controller"""
PolicyUpdater._logger.info("catch_up requested")
with self._lock:
- self._aud_catch_up = audit
+ self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
self.enqueue()
+ def _run_catch_up_timer(self):
+ """create and start the catch_up timer"""
+ if not self._catch_up_interval:
+ return
+
+ if self._catch_up_timer:
+ self._catch_up_timer.next()
+ self._logger.info("next step catch_up_timer in %s", self._catch_up_interval)
+ return
+
+ self._catch_up_timer = StepTimer(
+ "catch_up_timer",
+ self._catch_up_interval,
+ PolicyUpdater.catch_up,
+ self
+ )
+ self._catch_up_timer.start()
+ self._logger.info("started catch_up_timer in %s", self._catch_up_interval)
+
+ def _pause_catch_up_timer(self):
+ """stop catch_up_timer"""
+ if self._catch_up_timer:
+ self._logger.info("pause catch_up_timer")
+ self._catch_up_timer.pause()
+
+ def _stop_catch_up_timer(self):
+ """stop catch_up_timer"""
+ if self._catch_up_timer:
+ self._logger.info("stopping catch_up_timer")
+ self._catch_up_timer.stop()
+ self._catch_up_timer.join()
+ self._catch_up_timer = None
+ self._logger.info("stopped catch_up_timer")
+
+ def _need_to_send_catch_up(self, aud_catch_up, catch_up_message):
+ """try not to send the duplicate messages on auto catchup unless hitting the max count"""
+ if self._need_to_catch_up \
+ or aud_catch_up.req_message != AUTO_CATCH_UP \
+ or self._catch_up_skips >= self._catch_up_max_skips \
+ or not Utils.are_the_same(catch_up_message, self._catch_up_prev_message):
+ self._catch_up_skips = 0
+ self._need_to_catch_up = False
+ self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ return True
+
+ self._catch_up_skips += 1
+ self._catch_up_prev_message = copy.deepcopy(catch_up_message)
+ log_line = "skip {0} sending the same catch_up: {1}".format(
+ self._catch_up_skips, self._catch_up_prev_message
+ )
+ self._logger.info(log_line)
+ aud_catch_up.info(log_line)
+ return False
+
def _reset_queue(self):
"""clear up the queue"""
with self._lock:
@@ -103,32 +176,34 @@ class PolicyUpdater(Thread):
self._queue.task_done()
self._queue = Queue()
- def _on_catch_up(self, audit):
- """Bring the latest policies to DCAE-Controller"""
- self._lock.acquire()
- aud_catch_up = self._aud_catch_up
- if self._aud_catch_up:
- self._aud_catch_up = None
- self._lock.release()
+ def _on_catch_up(self, queued_audit):
+ """bring all the latest policies to DCAE-Controller"""
+ with self._lock:
+ aud_catch_up = self._aud_catch_up
+ if self._aud_catch_up:
+ self._aud_catch_up = None
if not aud_catch_up:
return False
PolicyUpdater._logger.info("catch_up")
+ self._pause_catch_up_timer()
- result = PolicyRest.get_latest_policies(aud_catch_up)
- result[CATCH_UP] = True
+ catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
+ catch_up_message[CATCH_UP] = True
if not aud_catch_up.is_success():
PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors")
- if not audit:
+ if not queued_audit:
self._queue.task_done()
- else:
- DeployHandler.policy_update(aud_catch_up, result)
+ elif self._need_to_send_catch_up(aud_catch_up, catch_up_message):
+ DeployHandler.policy_update(aud_catch_up, catch_up_message)
self._reset_queue()
success, _, _ = aud_catch_up.audit_done()
PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+ self._run_catch_up_timer()
+
return success
def shutdown(self, audit):
@@ -137,5 +212,8 @@ class PolicyUpdater(Thread):
with self._lock:
self._aud_shutdown = audit
self.enqueue()
+
+ self._stop_catch_up_timer()
+
if self.is_alive():
self.join()
diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py
index 652f98b..69978b6 100644
--- a/policyhandler/policy_utils.py
+++ b/policyhandler/policy_utils.py
@@ -18,11 +18,12 @@
"""policy-client communicates with policy-engine thru REST API"""
-import logging
import json
+import logging
import re
-from .policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, POLICY_BODY, POLICY_CONFIG
+from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME,
+ POLICY_VERSION)
class PolicyUtils(object):
"""policy-client utils"""
@@ -30,17 +31,6 @@ class PolicyUtils(object):
_policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
@staticmethod
- def safe_json_parse(json_str):
- """try parsing json without exception - returns the json_str back if fails"""
- if not json_str:
- return json_str
- try:
- return json.loads(json_str)
- except (ValueError, TypeError) as err:
- PolicyUtils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
- return json_str
-
- @staticmethod
def extract_policy_id(policy_name):
""" policy_name = policy_id + "." + <version> + "." + <extension>
For instance,
@@ -65,7 +55,7 @@ class PolicyUtils(object):
return policy
config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
if config:
- policy[POLICY_BODY][POLICY_CONFIG] = PolicyUtils.safe_json_parse(config)
+ policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
return policy
@staticmethod
@@ -131,3 +121,54 @@ class PolicyUtils(object):
policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
return policies
+
+class Utils(object):
+ """general purpose utils"""
+ _logger = logging.getLogger("policy_handler.utils")
+
+ @staticmethod
+ def safe_json_parse(json_str):
+ """try parsing json without exception - returns the json_str back if fails"""
+ if not json_str:
+ return json_str
+ try:
+ return json.loads(json_str)
+ except (ValueError, TypeError) as err:
+ Utils._logger.warn("unexpected json %s: %s", str(json_str), str(err))
+ return json_str
+
+ @staticmethod
+ def are_the_same(body_1, body_2):
+ """check whether both objects are the same"""
+ if (body_1 and not body_2) or (not body_1 and body_2):
+ Utils._logger.debug("only one is empty %s != %s", body_1, body_2)
+ return False
+
+ if body_1 is None and body_2 is None:
+ return True
+
+ if isinstance(body_1, list) and isinstance(body_2, list):
+ if len(body_1) != len(body_2):
+ Utils._logger.debug("len %s != %s", json.dumps(body_1), json.dumps(body_2))
+ return False
+
+ for val_1, val_2 in zip(body_1, body_2):
+ if not Utils.are_the_same(val_1, val_2):
+ return False
+ return True
+
+ if isinstance(body_1, dict) and isinstance(body_2, dict):
+ if body_1.viewkeys() ^ body_2.viewkeys():
+ Utils._logger.debug("keys %s != %s", json.dumps(body_1), json.dumps(body_2))
+ return False
+
+ for key, val_1 in body_1.iteritems():
+ if not Utils.are_the_same(val_1, body_2[key]):
+ return False
+ return True
+
+ # ... here when primitive values or mismatched types ...
+ the_same_values = (body_1 == body_2)
+ if not the_same_values:
+ Utils._logger.debug("values %s != %s", body_1, body_2)
+ return the_same_values
diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py
new file mode 100644
index 0000000..6e2b3f6
--- /dev/null
+++ b/policyhandler/step_timer.py
@@ -0,0 +1,63 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""periodically callback"""
+
+from threading import Event, Thread
+
+
+class StepTimer(Thread):
+ """call on_time after interval number of seconds, then wait to continue"""
+ def __init__(self, name, interval, on_time, *args, **kwargs):
+ Thread.__init__(self, name=name)
+ self._interval = interval
+ self._on_time = on_time
+ self._args = args
+ self._kwargs = kwargs
+
+ self._timeout = Event()
+ self._paused = Event()
+ self._continue = Event()
+ self._finished = Event()
+
+ def next(self):
+ """continue with the next timeout"""
+ self._paused.clear()
+ self._continue.set()
+
+ def pause(self):
+ """pause the timer"""
+ self._paused.set()
+
+ def stop(self):
+ """stop the timer if it hasn't finished yet"""
+ self._finished.set()
+ self._timeout.set()
+ self._continue.set()
+
+ def run(self):
+ """loop until stopped=finished"""
+ while True:
+ self._timeout.wait(self._interval)
+ if self._finished.is_set():
+ break
+ self._timeout.clear()
+ self._continue.clear()
+ if not self._paused.is_set():
+ self._on_time(*self._args, **self._kwargs)
+ self._continue.wait()
diff --git a/pom.xml b/pom.xml
index 2ed4312..97b9df5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
<groupId>org.onap.dcaegen2.platform</groupId>
<artifactId>policy-handler</artifactId>
<name>dcaegen2-platform-policy-handler</name>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/setup.py b/setup.py
index fa2bc73..3165504 100644
--- a/setup.py
+++ b/setup.py
@@ -23,7 +23,7 @@ from setuptools import setup
setup(
name='policyhandler',
description='DCAE-Controller policy-handler to communicate with policy-engine',
- version="2.2.0",
+ version="2.3.0",
author='Alex Shatov',
packages=['policyhandler'],
zip_safe=False,
diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py
index 375c358..b52e718 100644
--- a/tests/test_policyhandler.py
+++ b/tests/test_policyhandler.py
@@ -44,7 +44,7 @@ from policyhandler.policy_handler import LogWriter
from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER,
REMOVED_POLICIES, PolicyReceiver)
from policyhandler.policy_rest import PolicyRest
-from policyhandler.policy_utils import PolicyUtils
+from policyhandler.policy_utils import PolicyUtils, Utils
from policyhandler.web_server import _PolicyWeb
POLICY_HANDLER_VERSION = "2.2.0"
@@ -150,31 +150,6 @@ class MonkeyPolicyBody(object):
"property": None
}
- @staticmethod
- def is_the_same_dict(policy_body_1, policy_body_2):
- """check whether both policy_body objects are the same"""
- if not isinstance(policy_body_1, dict) or not isinstance(policy_body_2, dict):
- return False
- for key in policy_body_1.keys():
- if key not in policy_body_2:
- return False
-
- val_1 = policy_body_1[key]
- val_2 = policy_body_2[key]
- if isinstance(val_1, list) and isinstance(val_2, list):
- if sorted(val_1) != sorted(val_2):
- return False
- continue
-
- if isinstance(val_1, dict) \
- and not MonkeyPolicyBody.is_the_same_dict(val_1, val_2):
- return False
- if (val_1 is None and val_2 is not None) \
- or (val_1 is not None and val_2 is None) \
- or (val_1 != val_2):
- return False
- return True
-
class MonkeyPolicyEngine(object):
"""pretend this is the policy-engine"""
_scope_prefix = Config.config["scope_prefixes"][0]
@@ -358,8 +333,7 @@ def test_get_policy_latest(fix_pdp_post):
Settings.logger.info("expected_policy: %s", json.dumps(expected_policy))
Settings.logger.info("policy_latest: %s", json.dumps(policy_latest))
- assert MonkeyPolicyBody.is_the_same_dict(policy_latest, expected_policy)
- assert MonkeyPolicyBody.is_the_same_dict(expected_policy, policy_latest)
+ assert Utils.are_the_same(policy_latest, expected_policy)
def test_healthcheck():
"""test /healthcheck"""
@@ -422,8 +396,7 @@ class WebServerTest(CPWebCase):
Settings.logger.info("policy_latest: %s", self.body)
Settings.logger.info("expected_policy: %s", json.dumps(expected_policy))
- assert MonkeyPolicyBody.is_the_same_dict(policy_latest, expected_policy)
- assert MonkeyPolicyBody.is_the_same_dict(expected_policy, policy_latest)
+ assert Utils.are_the_same(policy_latest, expected_policy)
def test_web_all_policies_latest(self):
"""test GET /policies_latest"""
@@ -441,8 +414,7 @@ class WebServerTest(CPWebCase):
Settings.logger.info("policies_latest: %s", json.dumps(policies_latest))
Settings.logger.info("expected_policies: %s", json.dumps(expected_policies))
- assert MonkeyPolicyBody.is_the_same_dict(policies_latest, expected_policies)
- assert MonkeyPolicyBody.is_the_same_dict(expected_policies, policies_latest)
+ assert Utils.are_the_same(policies_latest, expected_policies)
def test_web_policies_latest(self):
"""test POST /policies_latest with policyName"""
@@ -466,8 +438,7 @@ class WebServerTest(CPWebCase):
Settings.logger.info("policies_latest: %s", json.dumps(policies_latest))
Settings.logger.info("expected_policies: %s", json.dumps(expected_policies))
- assert MonkeyPolicyBody.is_the_same_dict(policies_latest, expected_policies)
- assert MonkeyPolicyBody.is_the_same_dict(expected_policies, policies_latest)
+ assert Utils.are_the_same(policies_latest, expected_policies)
@pytest.mark.usefixtures(
"fix_deploy_handler",
diff --git a/version.properties b/version.properties
index 5791c10..744a2a4 100644
--- a/version.properties
+++ b/version.properties
@@ -1,5 +1,5 @@
major=2
-minor=2
+minor=3
patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}