summaryrefslogtreecommitdiffstats
path: root/test/mocks/emssimulator/swm/installNESw
diff options
context:
space:
mode:
authorEnbo Wang <wangenbo@huawei.com>2019-04-23 13:42:21 +0000
committerEnbo Wang <wangenbo@huawei.com>2019-04-23 14:17:45 +0000
commit6ab8b62c35c6c850b95dbb3206de78a6ddb1206d (patch)
tree3d20b3abea69a298426e70e1fe68efa8015e7d13 /test/mocks/emssimulator/swm/installNESw
parentca2b87e4a1e01d1184b0793ea98ddd5385dec6a6 (diff)
Add an EMS simulator
Change-Id: I4fedf9a812e19033e7f9a1bff55eae264bc5122f Issue-ID: INT-1041 Signed-off-by: Enbo Wang <wangenbo@huawei.com>
Diffstat (limited to 'test/mocks/emssimulator/swm/installNESw')
-rwxr-xr-xtest/mocks/emssimulator/swm/installNESw206
1 files changed, 206 insertions, 0 deletions
diff --git a/test/mocks/emssimulator/swm/installNESw b/test/mocks/emssimulator/swm/installNESw
new file mode 100755
index 000000000..84e2fb9ae
--- /dev/null
+++ b/test/mocks/emssimulator/swm/installNESw
@@ -0,0 +1,206 @@
+#!/usr/bin/python
+
+import sys
+import argparse
+import json
+import os
+import shutil
+import random
+import time
+import tempfile
+import zipfile
+
+import conf
+import ems_util
+
+
+def do_install_sw(sw_to_be_installed, ne_info):
+ """
+ return err, reason, installed_ne_sw
+ """
+
+ sw_install_dir_in_ne = conf.PNF_SIMULATORS_DIR + '/' + ne_info['omIP'] + conf.PNF_SW_INSTALL_DIR
+
+ if sw_to_be_installed.startswith('/'):
+ file_location = sw_to_be_installed
+ else:
+ sw_download_dir_in_ne = ne_info.get("downloadedSwLocation", "")
+ file_location = sw_download_dir_in_ne + '/' + sw_to_be_installed
+
+ if not os.access(file_location, os.R_OK):
+ return True, "Missing to be installed SW file %s" % file_location, None
+
+ try:
+ if not os.path.isdir(sw_install_dir_in_ne):
+ os.makedirs(sw_install_dir_in_ne)
+ except OSError as e:
+ return True, str(e), None
+
+ temp_dir = tempfile.mkdtemp(dir=sw_install_dir_in_ne)
+ if file_location.endswith(".zip"):
+ with zipfile.ZipFile(file_location) as sw_zip:
+ sw_zip.extractall(temp_dir)
+ else:
+ return True, "Only support zip file", None
+
+ manifest_location = temp_dir + '/' + conf.MANIFEST_FILE
+ if os.access(manifest_location, os.R_OK):
+ with open(manifest_location) as f_manifest:
+ manifest = json.load(f_manifest)
+ else:
+ shutil.rmtree(temp_dir, ignore_errors=True)
+ return True, "Missing manifest file in %s" % file_location, None
+
+ try:
+ target_sw_name = manifest["name"]
+ target_sw_version = manifest["version"]
+ except KeyError as e:
+ shutil.rmtree(temp_dir, ignore_errors=True)
+ return True, "Missing key %s in %s of %s" % (str(e), conf.MANIFEST_FILE, file_location), None
+
+ if "targetSwVersion" in ne_info and ne_info["targetSwVersion"] != target_sw_version:
+ shutil.rmtree(temp_dir, ignore_errors=True)
+ return True, "Conflicted targetVersion for %s" % file_location, None
+
+ ne_info["targetSwVersion"] = target_sw_version
+ ems_util.update_ne_info(ne_info)
+
+ target_sw_parent_dir = sw_install_dir_in_ne + '/' + target_sw_version
+ try:
+ if not os.path.isdir(target_sw_parent_dir):
+ os.makedirs(target_sw_parent_dir)
+ except OSError as e:
+ shutil.rmtree(temp_dir, ignore_errors=True)
+ return True, str(e), None
+
+ target_sw_dir = target_sw_parent_dir + '/' + target_sw_name
+ if os.path.isdir(target_sw_dir):
+ shutil.rmtree(target_sw_dir, ignore_errors=True)
+
+ try:
+ shutil.move(temp_dir, target_sw_dir)
+ except shutil.Error as e:
+ shutil.rmtree(temp_dir, ignore_errors=True)
+ return True, str(e), None
+
+ installed_ne_sw = target_sw_name + '-' + target_sw_version
+
+ installed_sw_db = target_sw_parent_dir + '/' + conf.INSTALLED_SW
+ if os.path.isfile(installed_sw_db):
+ with open(installed_sw_db) as f_installed_sw:
+ installed_sw_table = json.load(f_installed_sw)
+ if not installed_sw_table:
+ installed_sw_table = {}
+ else:
+ installed_sw_table = {}
+
+ target_sw_info = {
+ "name": target_sw_name,
+ "version": target_sw_version,
+ "installedLocation": target_sw_dir
+ }
+ installed_sw_table[installed_ne_sw] = target_sw_info
+
+ with open(installed_sw_db, 'w') as f_installed_sw:
+ json.dump(installed_sw_table, f_installed_sw, indent=2)
+
+ ne_info["installedSw"] = installed_sw_table
+
+ return False, None, installed_ne_sw
+
+
+def generate_notification(install_process_id, install_status, installed_ne_sw_info, failed_sw_info):
+ notification = {
+ "objectClass": "EMSClass",
+ "objectInstance": "EMSInstance",
+ "notificationId": random.randint(1, conf.MAX_INT),
+ "eventTime": time.asctime(),
+ "systemDN": "emssimulator",
+ "notificationType": "notifyInstallNESwStatusChanged",
+ "installProcessId": install_process_id,
+ "installOperationStatus": install_status
+ }
+
+ if installed_ne_sw_info:
+ notification["installedNESwInfo"] = installed_ne_sw_info
+
+ if failed_sw_info:
+ notification["failedSwInfo"] = failed_sw_info
+
+ return notification
+
+
+def install_ne_sw(sw_to_be_installed, ne_id):
+ ne_info = ems_util.get_ne_info_from_db_by_id(ne_id)
+
+ install_process_id = random.randint(1, conf.MAX_INT)
+ result = conf.REQ_SUCCESS
+ ret_value = {
+ "installProcessId": install_process_id,
+ "result": result
+ }
+
+ if not ne_info:
+ ret_value["result"] = conf.REQ_FAILURE
+ ret_value["reason"] = "Can not find NE %s" % ne_id
+ return ret_value
+
+ ne_info["status"] = conf.STATUS_INSTALLING
+ ems_util.update_ne_info(ne_info)
+
+ installed_ne_sw_info = []
+ failed_sw_info = []
+
+ err, reason, installed_ne_sw = do_install_sw(sw_to_be_installed, ne_info)
+
+ if not err:
+ installed_ne_sw_info.append(installed_ne_sw)
+ else:
+ result = conf.REQ_FAILURE
+ failed_sw_entry = {
+ "failedSw": installed_ne_sw,
+ "failureReason": reason
+ }
+ failed_sw_info.append(failed_sw_entry)
+
+ num_installed_ne_sw = len(installed_ne_sw_info)
+
+ if num_installed_ne_sw == 1:
+ install_status = "NE_SWINSTALLATION_SUCCESSFUL"
+ elif num_installed_ne_sw == 0:
+ install_status = "NE_SWINSTALLATION_FAILED"
+ else:
+ install_status = "NE_SWINSTALLATION_PARTIALLY_SUCCESSFUL"
+
+ notification = generate_notification(install_process_id, install_status, installed_ne_sw_info, failed_sw_info)
+ ems_util.send_notification(notification, install_process_id)
+
+ if result == conf.REQ_SUCCESS:
+ ems_util.update_ne_info(ne_info)
+ else:
+ ret_value["result"] = result
+ ret_value["reason"] = json.dumps(failed_sw_info)
+
+ # for automated software management, there is no listOfStepNumbersAndDurations
+ return ret_value
+
+
+def main():
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("--swToBeInstalled", help="The NE software to be installed", required=True)
+ parser.add_argument("--neIdentifier", help="The NE where the software can be installed", required=True)
+
+ args = parser.parse_args()
+
+ ret_value = install_ne_sw(args.swToBeInstalled, args.neIdentifier)
+ print json.dumps(ret_value)
+
+ if ret_value["result"] == conf.REQ_SUCCESS:
+ sys.exit(conf.RET_CODE_SUCCESS)
+ else:
+ sys.exit(conf.RET_CODE_FAILURE)
+
+
+if __name__ == '__main__':
+ main()