aboutsummaryrefslogtreecommitdiffstats
path: root/test/mocks/emssimulator/swm/activateNESw
diff options
context:
space:
mode:
Diffstat (limited to 'test/mocks/emssimulator/swm/activateNESw')
-rwxr-xr-xtest/mocks/emssimulator/swm/activateNESw131
1 files changed, 21 insertions, 110 deletions
diff --git a/test/mocks/emssimulator/swm/activateNESw b/test/mocks/emssimulator/swm/activateNESw
index 67d233e24..3531f1634 100755
--- a/test/mocks/emssimulator/swm/activateNESw
+++ b/test/mocks/emssimulator/swm/activateNESw
@@ -1,117 +1,28 @@
-#!/usr/bin/python
+#!/usr/bin/python3
+# ============LICENSE_START=======================================================
+# ONAP - SO
+# ================================================================================
+# Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
import sys
import argparse
import json
-import os
-import shutil
-import random
-import time
import conf
-import ems_util
-
-
-def do_activate_sw(sw_version_to_be_activated, ne_info):
- """
- return err, reason
- """
-
- installed_sw = ne_info.get("installedSw", {})
- if sw_version_to_be_activated in installed_sw:
- target_sw_version = installed_sw[sw_version_to_be_activated]["version"]
- else:
- target_sw_version = sw_version_to_be_activated
-
- sw_install_dir_in_ne = conf.PNF_SIMULATORS_DIR + '/' + ne_info['omIP'] + conf.PNF_SW_INSTALL_DIR
- target_sw_dir = sw_install_dir_in_ne + '/' + target_sw_version
- if not os.path.isdir(target_sw_dir):
- return True, "SW to be activated does not install"
-
- if "targetSwVersion" in ne_info:
- if ne_info["targetSwVersion"] != target_sw_version:
- return True, "Conflicted targetVersion with to be activated %s" % target_sw_version
- del ne_info["targetSwVersion"]
-
- old_sw_version = ne_info.get("oldSwVersion", "")
-
- if target_sw_version != ne_info["currentSwVersion"]:
- ne_info["oldSwVersion"] = ne_info["currentSwVersion"]
- ne_info["currentSwVersion"] = target_sw_version
- ne_info["status"] = conf.STATUS_ACTIVATING
- ems_util.update_ne_info(ne_info)
-
- if target_sw_version != old_sw_version:
- old_sw_dir = sw_install_dir_in_ne + '/' + old_sw_version
- if old_sw_version and os.path.isdir(old_sw_dir):
- shutil.rmtree(old_sw_dir, ignore_errors=True)
-
- old_cwd = os.getcwd()
- os.chdir(sw_install_dir_in_ne)
- if os.path.islink(conf.CURRENT_VERSION_DIR):
- os.remove(conf.CURRENT_VERSION_DIR)
- os.symlink(target_sw_version, conf.CURRENT_VERSION_DIR)
- os.chdir(old_cwd)
-
- if "downloadedSwLocation" in ne_info:
- if os.path.isdir(ne_info["downloadedSwLocation"]):
- shutil.rmtree(ne_info["downloadedSwLocation"], ignore_errors=True)
- del ne_info["downloadedSwLocation"]
-
- return False, None
-
-
-def generate_notification(activate_process_id, activate_status, sw_version, failure_reason):
- notification = {
- "objectClass": "EMSClass",
- "objectInstance": "EMSInstance",
- "notificationId": random.randint(1, conf.MAX_INT),
- "eventTime": time.asctime(),
- "systemDN": "emssimulator",
- "notificationType": "notifyActivateNESwStatusChanged",
- "activateProcessId": activate_process_id,
- "activateOperationStatus": activate_status,
- "swVersion": sw_version
- }
-
- if failure_reason:
- notification["failureReason"] = failure_reason
-
- return notification
-
-
-def activate_ne_sw(sw_version_to_be_activated, ne_id):
- ne_info = ems_util.get_ne_info_from_db_by_id(ne_id)
-
- activate_process_id = random.randint(1, conf.MAX_INT)
- result = conf.REQ_SUCCESS
- ret_value = {
- "activateProcessId": activate_process_id,
- "result": result
- }
-
- if not ne_info:
- ret_value["result"] = conf.REQ_FAILURE
- ret_value["reason"] = "Can not find NE %s" % ne_id
- return ret_value
-
- err, reason = do_activate_sw(sw_version_to_be_activated, ne_info)
-
- if not err:
- ne_info["status"] = conf.STATUS_ACTIVATED
- ems_util.update_ne_info(ne_info)
- activate_status = "NE_SWACTIVATION_SUCCESSFUL"
- else:
- ret_value["result"] = conf.REQ_FAILURE
- ret_value["reason"] = reason
-
- activate_status = "NE_SWACTIVATION_FAILED"
-
- notification = generate_notification(activate_process_id, activate_status, sw_version_to_be_activated, reason)
- ems_util.send_notification(notification, activate_process_id)
-
- # for automated software management, there is no listOfStepNumbersAndDurations
- return ret_value
+import activate_n_e_sw
def main():
@@ -122,8 +33,8 @@ def main():
args = parser.parse_args()
- ret_value = activate_ne_sw(args.swVersionToBeActivated, args.neIdentifier)
- print json.dumps(ret_value)
+ _, ret_value = activate_n_e_sw.activate(args.swVersionToBeActivated, args.neIdentifier)
+ print(json.dumps(ret_value))
if ret_value["result"] == conf.REQ_SUCCESS:
sys.exit(conf.RET_CODE_SUCCESS)