aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xtest/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py104
1 files changed, 50 insertions, 54 deletions
diff --git a/test/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py b/test/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py
index 0786637b0..0ebb2d654 100755
--- a/test/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py
+++ b/test/mocks/netconf-pnp-simulator/modules/pnf-sw-upgrade/subscriber.py
@@ -22,8 +22,8 @@ __author__ = "Eliezio Oliveira <eliezio.oliveira@est.tech>"
__copyright__ = "Copyright (C) 2020 Nordix Foundation"
__license__ = "Apache 2.0"
+import os
import time
-from concurrent.futures import ThreadPoolExecutor
from threading import Timer
import sysrepo as sr
@@ -31,6 +31,9 @@ from loguru import logger
YANG_MODULE_NAME = 'pnf-sw-upgrade'
+XPATH_CTX = sr.Xpath_Ctx()
+PAUSE_TO_LOCK = 0.5
+
#
# ----- BEGIN Finite State Machine definitions -----
#
@@ -49,9 +52,10 @@ ST_DOWNLOAD_COMPLETED = 'DOWNLOAD_COMPLETED'
ST_ACTIVATION_IN_PROGRESS = 'ACTIVATION_IN_PROGRESS'
ST_ACTIVATION_COMPLETED = 'ACTIVATION_COMPLETED'
-# Timeout used for timed transitions
-TO_DOWNLOAD = 7
-TO_ACTIVATION = 7
+# Timeouts used for timed transitions
+SWUG_TIMED_TRANSITION_TO = int(os.environ.get("SWUG_TIMED_TRANSITION_TO", "7"))
+TO_DOWNLOAD = SWUG_TIMED_TRANSITION_TO
+TO_ACTIVATION = SWUG_TIMED_TRANSITION_TO
def timestamper(sess, key_id):
@@ -102,6 +106,7 @@ STATE_MACHINE = {
}
}
+
#
# ----- END Finite State Machine definitions -----
#
@@ -129,17 +134,20 @@ def main():
# Function to be called for subscribed client of given session whenever configuration changes.
def module_change_cb(sess, module_name, event, private_ctx):
- try:
- conn = private_ctx
- change_path = xpath_of(None, 'action')
- it = sess.get_changes_iter(change_path)
- while True:
- change = sess.get_change_next(it)
- if change is None:
- break
- handle_change(conn, change.oper(), change.old_val(), change.new_val())
- except Exception as e:
- logger.error(e)
+ if event == sr.SR_EV_APPLY:
+ try:
+ conn = private_ctx
+ change_path = xpath_of(None, 'action')
+ it = sess.get_changes_iter(change_path)
+ while True:
+ change = sess.get_change_next(it)
+ if change is None:
+ break
+ op = change.oper()
+ if op in (sr.SR_OP_CREATED, sr.SR_OP_MODIFIED):
+ handle_trigger_action(conn, sess, change.new_val())
+ except Exception as e:
+ logger.error(e)
return sr.SR_ERR_OK
@@ -147,67 +155,55 @@ def module_change_cb(sess, module_name, event, private_ctx):
# It does so by loading all the items of a session and printing them out.
def print_current_config(session, module_name):
select_xpath = f"/{module_name}:*//*"
-
values = session.get_items(select_xpath)
-
- if values is not None:
+ if values:
logger.info("========== BEGIN CONFIG ==========")
for i in range(values.val_cnt()):
logger.info(values.val(i).to_string().strip())
logger.info("=========== END CONFIG ===========")
-def handle_change(conn, op, old_val, new_val):
+def handle_trigger_action(conn, sess, action_val):
"""
Handle individual changes on the model.
"""
- if op == sr.SR_OP_CREATED:
- logger.info("CREATED: %s" % new_val.to_string())
- xpath = new_val.xpath()
- last_node = xpath_ctx.last_node(xpath)
- # Warning: 'key_value' modifies 'xpath'!
- key_id = xpath_ctx.key_value(xpath, 'upgrade-package', 'id')
- if key_id and last_node == 'action':
- executor.submit(execute_action, conn, key_id, new_val.data().get_enum())
- elif op == sr.SR_OP_DELETED:
- logger.info("DELETED: %s" % old_val.to_string())
- elif op == sr.SR_OP_MODIFIED:
- logger.info("MODIFIED: %s to %s" % (old_val.to_string(), new_val.to_string()))
- elif op == sr.SR_OP_MOVED:
- logger.info("MOVED: %s after %s" % (new_val.xpath(), old_val.xpath()))
-
-
-def execute_action(conn, key_id, action):
- sess = sr.Session(conn)
- try:
+ logger.info("CREATED/MODIFIED: %s" % action_val.to_string())
+ xpath = action_val.xpath()
+ last_node = XPATH_CTX.last_node(xpath)
+ # Warning: 'key_value' modifies 'xpath'!
+ key_id = XPATH_CTX.key_value(xpath, 'upgrade-package', 'id')
+ if key_id and last_node == 'action':
+ action = action_val.data().get_enum()
cur_state = sess.get_item(xpath_of(key_id, 'current-status')).data().get_enum()
next_state_str = STATE_MACHINE[cur_state]['transitions'].get(action, None)
if next_state_str:
- handle_set_state(conn, key_id, next_state_str)
- sess.delete_item(xpath_of(key_id, 'action'))
- sess.commit()
- finally:
- sess.session_stop()
+ Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, next_state_str)).start()
-def handle_set_state(conn, key_id, state_str):
+def try_change_state(conn, key_id, state_str):
sess = sr.Session(conn)
try:
- state = sr.Val(state_str, sr.SR_ENUM_T)
- sess.set_item(xpath_of(key_id, 'current-status'), state)
- on_enter = STATE_MACHINE[state_str].get('on_enter', None)
- if on_enter:
- # noinspection PyCallingNonCallable
- on_enter(sess, key_id)
- sess.commit()
+ try:
+ sess.lock_module(YANG_MODULE_NAME)
+ except RuntimeError:
+ logger.warning(f"Retrying after {PAUSE_TO_LOCK}s")
+ Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, state_str)).start()
+ return
+ try:
+ state = sr.Val(state_str, sr.SR_ENUM_T)
+ sess.set_item(xpath_of(key_id, 'current-status'), state)
+ on_enter = STATE_MACHINE[state_str].get('on_enter', None)
+ if callable(on_enter):
+ on_enter(sess, key_id)
+ sess.commit()
+ finally:
+ sess.unlock_module(YANG_MODULE_NAME)
delay, next_state_str = STATE_MACHINE[state_str].get('timed_transition', [0, None])
if delay:
- Timer(delay, handle_set_state, (conn, key_id, next_state_str)).start()
+ Timer(delay, try_change_state, (conn, key_id, next_state_str)).start()
finally:
sess.session_stop()
if __name__ == '__main__':
- xpath_ctx = sr.Xpath_Ctx()
- executor = ThreadPoolExecutor(max_workers=2)
main()