diff options
Diffstat (limited to 'test/mocks/emssimulator/swm/installNESw')
-rwxr-xr-x | test/mocks/emssimulator/swm/installNESw | 202 |
1 files changed, 21 insertions, 181 deletions
diff --git a/test/mocks/emssimulator/swm/installNESw b/test/mocks/emssimulator/swm/installNESw index 84e2fb9ae..e56f799f7 100755 --- a/test/mocks/emssimulator/swm/installNESw +++ b/test/mocks/emssimulator/swm/installNESw @@ -1,188 +1,28 @@ -#!/usr/bin/python +#!/usr/bin/python3 +# ============LICENSE_START======================================================= +# ONAP - SO +# ================================================================================ +# Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= import sys import argparse import json -import os -import shutil -import random -import time -import tempfile -import zipfile import conf -import ems_util - - -def do_install_sw(sw_to_be_installed, ne_info): - """ - return err, reason, installed_ne_sw - """ - - sw_install_dir_in_ne = conf.PNF_SIMULATORS_DIR + '/' + ne_info['omIP'] + conf.PNF_SW_INSTALL_DIR - - if sw_to_be_installed.startswith('/'): - file_location = sw_to_be_installed - else: - sw_download_dir_in_ne = ne_info.get("downloadedSwLocation", "") - file_location = sw_download_dir_in_ne + '/' + sw_to_be_installed - - if not os.access(file_location, os.R_OK): - return True, "Missing to be installed SW file %s" % file_location, None - - try: - if not os.path.isdir(sw_install_dir_in_ne): - os.makedirs(sw_install_dir_in_ne) - except OSError as e: - return True, str(e), None - - temp_dir = tempfile.mkdtemp(dir=sw_install_dir_in_ne) - if file_location.endswith(".zip"): - with zipfile.ZipFile(file_location) as sw_zip: - sw_zip.extractall(temp_dir) - else: - return True, "Only support zip file", None - - manifest_location = temp_dir + '/' + conf.MANIFEST_FILE - if os.access(manifest_location, os.R_OK): - with open(manifest_location) as f_manifest: - manifest = json.load(f_manifest) - else: - shutil.rmtree(temp_dir, ignore_errors=True) - return True, "Missing manifest file in %s" % file_location, None - - try: - target_sw_name = manifest["name"] - target_sw_version = manifest["version"] - except KeyError as e: - shutil.rmtree(temp_dir, ignore_errors=True) - return True, "Missing key %s in %s of %s" % (str(e), conf.MANIFEST_FILE, file_location), None - - if "targetSwVersion" in ne_info and ne_info["targetSwVersion"] != target_sw_version: - shutil.rmtree(temp_dir, ignore_errors=True) - return True, "Conflicted targetVersion for %s" % file_location, None - - ne_info["targetSwVersion"] = target_sw_version - ems_util.update_ne_info(ne_info) - - target_sw_parent_dir = sw_install_dir_in_ne + '/' + target_sw_version - try: - if not os.path.isdir(target_sw_parent_dir): - os.makedirs(target_sw_parent_dir) - except OSError as e: - shutil.rmtree(temp_dir, ignore_errors=True) - return True, str(e), None - - target_sw_dir = target_sw_parent_dir + '/' + target_sw_name - if os.path.isdir(target_sw_dir): - shutil.rmtree(target_sw_dir, ignore_errors=True) - - try: - shutil.move(temp_dir, target_sw_dir) - except shutil.Error as e: - shutil.rmtree(temp_dir, ignore_errors=True) - return True, str(e), None - - installed_ne_sw = target_sw_name + '-' + target_sw_version - - installed_sw_db = target_sw_parent_dir + '/' + conf.INSTALLED_SW - if os.path.isfile(installed_sw_db): - with open(installed_sw_db) as f_installed_sw: - installed_sw_table = json.load(f_installed_sw) - if not installed_sw_table: - installed_sw_table = {} - else: - installed_sw_table = {} - - target_sw_info = { - "name": target_sw_name, - "version": target_sw_version, - "installedLocation": target_sw_dir - } - installed_sw_table[installed_ne_sw] = target_sw_info - - with open(installed_sw_db, 'w') as f_installed_sw: - json.dump(installed_sw_table, f_installed_sw, indent=2) - - ne_info["installedSw"] = installed_sw_table - - return False, None, installed_ne_sw - - -def generate_notification(install_process_id, install_status, installed_ne_sw_info, failed_sw_info): - notification = { - "objectClass": "EMSClass", - "objectInstance": "EMSInstance", - "notificationId": random.randint(1, conf.MAX_INT), - "eventTime": time.asctime(), - "systemDN": "emssimulator", - "notificationType": "notifyInstallNESwStatusChanged", - "installProcessId": install_process_id, - "installOperationStatus": install_status - } - - if installed_ne_sw_info: - notification["installedNESwInfo"] = installed_ne_sw_info - - if failed_sw_info: - notification["failedSwInfo"] = failed_sw_info - - return notification - - -def install_ne_sw(sw_to_be_installed, ne_id): - ne_info = ems_util.get_ne_info_from_db_by_id(ne_id) - - install_process_id = random.randint(1, conf.MAX_INT) - result = conf.REQ_SUCCESS - ret_value = { - "installProcessId": install_process_id, - "result": result - } - - if not ne_info: - ret_value["result"] = conf.REQ_FAILURE - ret_value["reason"] = "Can not find NE %s" % ne_id - return ret_value - - ne_info["status"] = conf.STATUS_INSTALLING - ems_util.update_ne_info(ne_info) - - installed_ne_sw_info = [] - failed_sw_info = [] - - err, reason, installed_ne_sw = do_install_sw(sw_to_be_installed, ne_info) - - if not err: - installed_ne_sw_info.append(installed_ne_sw) - else: - result = conf.REQ_FAILURE - failed_sw_entry = { - "failedSw": installed_ne_sw, - "failureReason": reason - } - failed_sw_info.append(failed_sw_entry) - - num_installed_ne_sw = len(installed_ne_sw_info) - - if num_installed_ne_sw == 1: - install_status = "NE_SWINSTALLATION_SUCCESSFUL" - elif num_installed_ne_sw == 0: - install_status = "NE_SWINSTALLATION_FAILED" - else: - install_status = "NE_SWINSTALLATION_PARTIALLY_SUCCESSFUL" - - notification = generate_notification(install_process_id, install_status, installed_ne_sw_info, failed_sw_info) - ems_util.send_notification(notification, install_process_id) - - if result == conf.REQ_SUCCESS: - ems_util.update_ne_info(ne_info) - else: - ret_value["result"] = result - ret_value["reason"] = json.dumps(failed_sw_info) - - # for automated software management, there is no listOfStepNumbersAndDurations - return ret_value +import install_n_e_sw def main(): @@ -193,8 +33,8 @@ def main(): args = parser.parse_args() - ret_value = install_ne_sw(args.swToBeInstalled, args.neIdentifier) - print json.dumps(ret_value) + _, ret_value = install_n_e_sw.install(args.swToBeInstalled, args.neIdentifier) + print(json.dumps(ret_value)) if ret_value["result"] == conf.REQ_SUCCESS: sys.exit(conf.RET_CODE_SUCCESS) |