diff options
Diffstat (limited to 'test/mocks/emssimulator/swm/activateNESw')
-rwxr-xr-x | test/mocks/emssimulator/swm/activateNESw | 131 |
1 files changed, 21 insertions, 110 deletions
diff --git a/test/mocks/emssimulator/swm/activateNESw b/test/mocks/emssimulator/swm/activateNESw index 67d233e24..3531f1634 100755 --- a/test/mocks/emssimulator/swm/activateNESw +++ b/test/mocks/emssimulator/swm/activateNESw @@ -1,117 +1,28 @@ -#!/usr/bin/python +#!/usr/bin/python3 +# ============LICENSE_START======================================================= +# ONAP - SO +# ================================================================================ +# Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= import sys import argparse import json -import os -import shutil -import random -import time import conf -import ems_util - - -def do_activate_sw(sw_version_to_be_activated, ne_info): - """ - return err, reason - """ - - installed_sw = ne_info.get("installedSw", {}) - if sw_version_to_be_activated in installed_sw: - target_sw_version = installed_sw[sw_version_to_be_activated]["version"] - else: - target_sw_version = sw_version_to_be_activated - - sw_install_dir_in_ne = conf.PNF_SIMULATORS_DIR + '/' + ne_info['omIP'] + conf.PNF_SW_INSTALL_DIR - target_sw_dir = sw_install_dir_in_ne + '/' + target_sw_version - if not os.path.isdir(target_sw_dir): - return True, "SW to be activated does not install" - - if "targetSwVersion" in ne_info: - if ne_info["targetSwVersion"] != target_sw_version: - return True, "Conflicted targetVersion with to be activated %s" % target_sw_version - del ne_info["targetSwVersion"] - - old_sw_version = ne_info.get("oldSwVersion", "") - - if target_sw_version != ne_info["currentSwVersion"]: - ne_info["oldSwVersion"] = ne_info["currentSwVersion"] - ne_info["currentSwVersion"] = target_sw_version - ne_info["status"] = conf.STATUS_ACTIVATING - ems_util.update_ne_info(ne_info) - - if target_sw_version != old_sw_version: - old_sw_dir = sw_install_dir_in_ne + '/' + old_sw_version - if old_sw_version and os.path.isdir(old_sw_dir): - shutil.rmtree(old_sw_dir, ignore_errors=True) - - old_cwd = os.getcwd() - os.chdir(sw_install_dir_in_ne) - if os.path.islink(conf.CURRENT_VERSION_DIR): - os.remove(conf.CURRENT_VERSION_DIR) - os.symlink(target_sw_version, conf.CURRENT_VERSION_DIR) - os.chdir(old_cwd) - - if "downloadedSwLocation" in ne_info: - if os.path.isdir(ne_info["downloadedSwLocation"]): - shutil.rmtree(ne_info["downloadedSwLocation"], ignore_errors=True) - del ne_info["downloadedSwLocation"] - - return False, None - - -def generate_notification(activate_process_id, activate_status, sw_version, failure_reason): - notification = { - "objectClass": "EMSClass", - "objectInstance": "EMSInstance", - "notificationId": random.randint(1, conf.MAX_INT), - "eventTime": time.asctime(), - "systemDN": "emssimulator", - "notificationType": "notifyActivateNESwStatusChanged", - "activateProcessId": activate_process_id, - "activateOperationStatus": activate_status, - "swVersion": sw_version - } - - if failure_reason: - notification["failureReason"] = failure_reason - - return notification - - -def activate_ne_sw(sw_version_to_be_activated, ne_id): - ne_info = ems_util.get_ne_info_from_db_by_id(ne_id) - - activate_process_id = random.randint(1, conf.MAX_INT) - result = conf.REQ_SUCCESS - ret_value = { - "activateProcessId": activate_process_id, - "result": result - } - - if not ne_info: - ret_value["result"] = conf.REQ_FAILURE - ret_value["reason"] = "Can not find NE %s" % ne_id - return ret_value - - err, reason = do_activate_sw(sw_version_to_be_activated, ne_info) - - if not err: - ne_info["status"] = conf.STATUS_ACTIVATED - ems_util.update_ne_info(ne_info) - activate_status = "NE_SWACTIVATION_SUCCESSFUL" - else: - ret_value["result"] = conf.REQ_FAILURE - ret_value["reason"] = reason - - activate_status = "NE_SWACTIVATION_FAILED" - - notification = generate_notification(activate_process_id, activate_status, sw_version_to_be_activated, reason) - ems_util.send_notification(notification, activate_process_id) - - # for automated software management, there is no listOfStepNumbersAndDurations - return ret_value +import activate_n_e_sw def main(): @@ -122,8 +33,8 @@ def main(): args = parser.parse_args() - ret_value = activate_ne_sw(args.swVersionToBeActivated, args.neIdentifier) - print json.dumps(ret_value) + _, ret_value = activate_n_e_sw.activate(args.swVersionToBeActivated, args.neIdentifier) + print(json.dumps(ret_value)) if ret_value["result"] == conf.REQ_SUCCESS: sys.exit(conf.RET_CODE_SUCCESS) |