# ============LICENSE_START======================================================= # ONAP - SO # ================================================================================ # Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= import os import time import json import jsonpath import conf def get_log_file(operation): return os.path.join(conf.LOGGER_FILE_DIR, "%s.txt" % operation) def get_download_dir(om_ip): return os.path.join(conf.PNF_SIMULATORS_DIR, om_ip, conf.COMMON_PATH, conf.PNF_SW_DOWNLOAD_DIR) def get_install_dir(om_ip): return os.path.join(conf.PNF_SIMULATORS_DIR, om_ip, conf.COMMON_PATH, conf.PNF_SW_INSTALL_DIR) def get_ne_info_list_from_db(ne_filter): with open(conf.NE_INFO_TABLE) as f_ne_info: ne_info_table = json.load(f_ne_info) if ne_info_table: ne_info_list = jsonpath.jsonpath(ne_info_table, ne_filter) return ne_info_list if ne_info_list else [] else: return [] def get_ne_info_from_db_by_id(ne_id): ne_filter = '$..[?(@.nEIdentification == "%s")]' % ne_id ne_info_list = get_ne_info_list_from_db(ne_filter) return ne_info_list[0] if ne_info_list else None def update_ne_info(ne_info): with open(conf.NE_INFO_TABLE) as f_ne_info: ne_info_table = json.load(f_ne_info) ne_info["updateTime"] = time.asctime() if ne_info_table: for i in range(0, len(ne_info_table)): if ne_info_table[i]["nEIdentification"] == ne_info["nEIdentification"]: ne_info_table[i] = ne_info break else: ne_info_table.append(ne_info) else: ne_info_table = [ne_info] with open(conf.NE_INFO_TABLE, 'w') as f_ne_info: json.dump(ne_info_table, f_ne_info, indent=2) def send_notification(notification, process_id): notification_file = os.path.join(conf.NOTIFICATION_DIR, '%s-%d' % (notification['notificationType'], process_id)) with open(notification_file, 'w') as f_notification: f_notification.write(json.dumps(notification))