#!/usr/bin/python import sys import argparse import json import os import shutil import random import time import tempfile import zipfile import conf import ems_util def do_install_sw(sw_to_be_installed, ne_info): """ return err, reason, installed_ne_sw """ sw_install_dir_in_ne = conf.PNF_SIMULATORS_DIR + '/' + ne_info['omIP'] + conf.PNF_SW_INSTALL_DIR if sw_to_be_installed.startswith('/'): file_location = sw_to_be_installed else: sw_download_dir_in_ne = ne_info.get("downloadedSwLocation", "") file_location = sw_download_dir_in_ne + '/' + sw_to_be_installed if not os.access(file_location, os.R_OK): return True, "Missing to be installed SW file %s" % file_location, None try: if not os.path.isdir(sw_install_dir_in_ne): os.makedirs(sw_install_dir_in_ne) except OSError as e: return True, str(e), None temp_dir = tempfile.mkdtemp(dir=sw_install_dir_in_ne) if file_location.endswith(".zip"): with zipfile.ZipFile(file_location) as sw_zip: sw_zip.extractall(temp_dir) else: return True, "Only support zip file", None manifest_location = temp_dir + '/' + conf.MANIFEST_FILE if os.access(manifest_location, os.R_OK): with open(manifest_location) as f_manifest: manifest = json.load(f_manifest) else: shutil.rmtree(temp_dir, ignore_errors=True) return True, "Missing manifest file in %s" % file_location, None try: target_sw_name = manifest["name"] target_sw_version = manifest["version"] except KeyError as e: shutil.rmtree(temp_dir, ignore_errors=True) return True, "Missing key %s in %s of %s" % (str(e), conf.MANIFEST_FILE, file_location), None if "targetSwVersion" in ne_info and ne_info["targetSwVersion"] != target_sw_version: shutil.rmtree(temp_dir, ignore_errors=True) return True, "Conflicted targetVersion for %s" % file_location, None ne_info["targetSwVersion"] = target_sw_version ems_util.update_ne_info(ne_info) target_sw_parent_dir = sw_install_dir_in_ne + '/' + target_sw_version try: if not os.path.isdir(target_sw_parent_dir): os.makedirs(target_sw_parent_dir) except OSError as e: shutil.rmtree(temp_dir, ignore_errors=True) return True, str(e), None target_sw_dir = target_sw_parent_dir + '/' + target_sw_name if os.path.isdir(target_sw_dir): shutil.rmtree(target_sw_dir, ignore_errors=True) try: shutil.move(temp_dir, target_sw_dir) except shutil.Error as e: shutil.rmtree(temp_dir, ignore_errors=True) return True, str(e), None installed_ne_sw = target_sw_name + '-' + target_sw_version installed_sw_db = target_sw_parent_dir + '/' + conf.INSTALLED_SW if os.path.isfile(installed_sw_db): with open(installed_sw_db) as f_installed_sw: installed_sw_table = json.load(f_installed_sw) if not installed_sw_table: installed_sw_table = {} else: installed_sw_table = {} target_sw_info = { "name": target_sw_name, "version": target_sw_version, "installedLocation": target_sw_dir } installed_sw_table[installed_ne_sw] = target_sw_info with open(installed_sw_db, 'w') as f_installed_sw: json.dump(installed_sw_table, f_installed_sw, indent=2) ne_info["installedSw"] = installed_sw_table return False, None, installed_ne_sw def generate_notification(install_process_id, install_status, installed_ne_sw_info, failed_sw_info): notification = { "objectClass": "EMSClass", "objectInstance": "EMSInstance", "notificationId": random.randint(1, conf.MAX_INT), "eventTime": time.asctime(), "systemDN": "emssimulator", "notificationType": "notifyInstallNESwStatusChanged", "installProcessId": install_process_id, "installOperationStatus": install_status } if installed_ne_sw_info: notification["installedNESwInfo"] = installed_ne_sw_info if failed_sw_info: notification["failedSwInfo"] = failed_sw_info return notification def install_ne_sw(sw_to_be_installed, ne_id): ne_info = ems_util.get_ne_info_from_db_by_id(ne_id) install_process_id = random.randint(1, conf.MAX_INT) result = conf.REQ_SUCCESS ret_value = { "installProcessId": install_process_id, "result": result } if not ne_info: ret_value["result"] = conf.REQ_FAILURE ret_value["reason"] = "Can not find NE %s" % ne_id return ret_value ne_info["status"] = conf.STATUS_INSTALLING ems_util.update_ne_info(ne_info) installed_ne_sw_info = [] failed_sw_info = [] err, reason, installed_ne_sw = do_install_sw(sw_to_be_installed, ne_info) if not err: installed_ne_sw_info.append(installed_ne_sw) else: result = conf.REQ_FAILURE failed_sw_entry = { "failedSw": installed_ne_sw, "failureReason": reason } failed_sw_info.append(failed_sw_entry) num_installed_ne_sw = len(installed_ne_sw_info) if num_installed_ne_sw == 1: install_status = "NE_SWINSTALLATION_SUCCESSFUL" elif num_installed_ne_sw == 0: install_status = "NE_SWINSTALLATION_FAILED" else: install_status = "NE_SWINSTALLATION_PARTIALLY_SUCCESSFUL" notification = generate_notification(install_process_id, install_status, installed_ne_sw_info, failed_sw_info) ems_util.send_notification(notification, install_process_id) if result == conf.REQ_SUCCESS: ems_util.update_ne_info(ne_info) else: ret_value["result"] = result ret_value["reason"] = json.dumps(failed_sw_info) # for automated software management, there is no listOfStepNumbersAndDurations return ret_value def main(): parser = argparse.ArgumentParser() parser.add_argument("--swToBeInstalled", help="The NE software to be installed", required=True) parser.add_argument("--neIdentifier", help="The NE where the software can be installed", required=True) args = parser.parse_args() ret_value = install_ne_sw(args.swToBeInstalled, args.neIdentifier) print json.dumps(ret_value) if ret_value["result"] == conf.REQ_SUCCESS: sys.exit(conf.RET_CODE_SUCCESS) else: sys.exit(conf.RET_CODE_FAILURE) if __name__ == '__main__': main()