From 1719606aee6f299f9fa03671c0265cdb6007567e Mon Sep 17 00:00:00 2001 From: ebo Date: Wed, 29 Apr 2020 00:18:17 +0100 Subject: pnf-sw-upgrade: fix lock-protected update of yang module The update must be scoped by a lock on yang module Other changes: - 'action' field no longer deleted; handling CREATED/MODIFIED events on this field - Configurable delay on timed-transitions via SWUG_TIMED_TRANSITION_TO environment variable Issue-ID: INT-1516 Change-Id: I22fb7b558ae371b6cff487633eae606f8fe535e1 Signed-off-by: ebo --- .../modules/pnf-sw-upgrade/subscriber.py | 104 ++++++++++----------- 1 file changed, 50 insertions(+), 54 deletions(-) diff --git a/test/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py b/test/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py index 0786637b0..0ebb2d654 100755 --- a/test/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py +++ b/test/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py @@ -22,8 +22,8 @@ __author__ = "Eliezio Oliveira " __copyright__ = "Copyright (C) 2020 Nordix Foundation" __license__ = "Apache 2.0" +import os import time -from concurrent.futures import ThreadPoolExecutor from threading import Timer import sysrepo as sr @@ -31,6 +31,9 @@ from loguru import logger YANG_MODULE_NAME = 'pnf-sw-upgrade' +XPATH_CTX = sr.Xpath_Ctx() +PAUSE_TO_LOCK = 0.5 + # # ----- BEGIN Finite State Machine definitions ----- # @@ -49,9 +52,10 @@ ST_DOWNLOAD_COMPLETED = 'DOWNLOAD_COMPLETED' ST_ACTIVATION_IN_PROGRESS = 'ACTIVATION_IN_PROGRESS' ST_ACTIVATION_COMPLETED = 'ACTIVATION_COMPLETED' -# Timeout used for timed transitions -TO_DOWNLOAD = 7 -TO_ACTIVATION = 7 +# Timeouts used for timed transitions +SWUG_TIMED_TRANSITION_TO = int(os.environ.get("SWUG_TIMED_TRANSITION_TO", "7")) +TO_DOWNLOAD = SWUG_TIMED_TRANSITION_TO +TO_ACTIVATION = SWUG_TIMED_TRANSITION_TO def timestamper(sess, key_id): @@ -102,6 +106,7 @@ STATE_MACHINE = { } } + # # ----- END Finite State Machine definitions ----- # @@ -129,17 +134,20 @@ def main(): # Function to be called for subscribed client of given session whenever configuration changes. def module_change_cb(sess, module_name, event, private_ctx): - try: - conn = private_ctx - change_path = xpath_of(None, 'action') - it = sess.get_changes_iter(change_path) - while True: - change = sess.get_change_next(it) - if change is None: - break - handle_change(conn, change.oper(), change.old_val(), change.new_val()) - except Exception as e: - logger.error(e) + if event == sr.SR_EV_APPLY: + try: + conn = private_ctx + change_path = xpath_of(None, 'action') + it = sess.get_changes_iter(change_path) + while True: + change = sess.get_change_next(it) + if change is None: + break + op = change.oper() + if op in (sr.SR_OP_CREATED, sr.SR_OP_MODIFIED): + handle_trigger_action(conn, sess, change.new_val()) + except Exception as e: + logger.error(e) return sr.SR_ERR_OK @@ -147,67 +155,55 @@ def module_change_cb(sess, module_name, event, private_ctx): # It does so by loading all the items of a session and printing them out. def print_current_config(session, module_name): select_xpath = f"/{module_name}:*//*" - values = session.get_items(select_xpath) - - if values is not None: + if values: logger.info("========== BEGIN CONFIG ==========") for i in range(values.val_cnt()): logger.info(values.val(i).to_string().strip()) logger.info("=========== END CONFIG ===========") -def handle_change(conn, op, old_val, new_val): +def handle_trigger_action(conn, sess, action_val): """ Handle individual changes on the model. """ - if op == sr.SR_OP_CREATED: - logger.info("CREATED: %s" % new_val.to_string()) - xpath = new_val.xpath() - last_node = xpath_ctx.last_node(xpath) - # Warning: 'key_value' modifies 'xpath'! - key_id = xpath_ctx.key_value(xpath, 'upgrade-package', 'id') - if key_id and last_node == 'action': - executor.submit(execute_action, conn, key_id, new_val.data().get_enum()) - elif op == sr.SR_OP_DELETED: - logger.info("DELETED: %s" % old_val.to_string()) - elif op == sr.SR_OP_MODIFIED: - logger.info("MODIFIED: %s to %s" % (old_val.to_string(), new_val.to_string())) - elif op == sr.SR_OP_MOVED: - logger.info("MOVED: %s after %s" % (new_val.xpath(), old_val.xpath())) - - -def execute_action(conn, key_id, action): - sess = sr.Session(conn) - try: + logger.info("CREATED/MODIFIED: %s" % action_val.to_string()) + xpath = action_val.xpath() + last_node = XPATH_CTX.last_node(xpath) + # Warning: 'key_value' modifies 'xpath'! + key_id = XPATH_CTX.key_value(xpath, 'upgrade-package', 'id') + if key_id and last_node == 'action': + action = action_val.data().get_enum() cur_state = sess.get_item(xpath_of(key_id, 'current-status')).data().get_enum() next_state_str = STATE_MACHINE[cur_state]['transitions'].get(action, None) if next_state_str: - handle_set_state(conn, key_id, next_state_str) - sess.delete_item(xpath_of(key_id, 'action')) - sess.commit() - finally: - sess.session_stop() + Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, next_state_str)).start() -def handle_set_state(conn, key_id, state_str): +def try_change_state(conn, key_id, state_str): sess = sr.Session(conn) try: - state = sr.Val(state_str, sr.SR_ENUM_T) - sess.set_item(xpath_of(key_id, 'current-status'), state) - on_enter = STATE_MACHINE[state_str].get('on_enter', None) - if on_enter: - # noinspection PyCallingNonCallable - on_enter(sess, key_id) - sess.commit() + try: + sess.lock_module(YANG_MODULE_NAME) + except RuntimeError: + logger.warning(f"Retrying after {PAUSE_TO_LOCK}s") + Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, state_str)).start() + return + try: + state = sr.Val(state_str, sr.SR_ENUM_T) + sess.set_item(xpath_of(key_id, 'current-status'), state) + on_enter = STATE_MACHINE[state_str].get('on_enter', None) + if callable(on_enter): + on_enter(sess, key_id) + sess.commit() + finally: + sess.unlock_module(YANG_MODULE_NAME) delay, next_state_str = STATE_MACHINE[state_str].get('timed_transition', [0, None]) if delay: - Timer(delay, handle_set_state, (conn, key_id, next_state_str)).start() + Timer(delay, try_change_state, (conn, key_id, next_state_str)).start() finally: sess.session_stop() if __name__ == '__main__': - xpath_ctx = sr.Xpath_Ctx() - executor = ThreadPoolExecutor(max_workers=2) main() -- cgit 1.2.3-korg