From 2108563705a2ec8bb80029d36122c69fa4d06df5 Mon Sep 17 00:00:00 2001 From: "Hansen, Tony (th1395)" Date: Wed, 1 Dec 2021 22:01:56 +0000 Subject: run the black formatting tool on python code also fix up some copyright & license block lines Change-Id: Ifb628e2ef1e5f13fed0a29964eec387d3982d605 Signed-off-by: Hansen, Tony (th1395) Issue-ID: DCAEGEN2-2995 Signed-off-by: Hansen, Tony (th1395) --- miss_htbt_service/misshtbtd.py | 257 +++++++++++++++++++++++++++-------------- 1 file changed, 172 insertions(+), 85 deletions(-) (limited to 'miss_htbt_service/misshtbtd.py') diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 6be2260..5ba0860 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -2,9 +2,9 @@ # ============LICENSE_START======================================================= # Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Samsung Electronics. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Samsung Electronics. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -59,6 +59,7 @@ CONFIG_PATH = "../etc/config.json" def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name): from psycopg2 import connect + try: con = connect(user=user_name, host=ip_address, password=password) database_name = db_name @@ -82,8 +83,8 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password def read_hb_common(user_name, password, ip_address, port_num, db_name): - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": hbc_pid = 10 hbc_srcName = "srvc_name" hbc_time = 1584595881 @@ -105,41 +106,47 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name): def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name): current_time = int(round(time.time())) source_name = socket.gethostname() - source_name = source_name + "-" + os.getenv('SERVICE_NAME', "") - envPytest = os.getenv('pytest', "") - if envPytest != 'test': + source_name = source_name + "-" + os.getenv("SERVICE_NAME", "") + envPytest = os.getenv("pytest", "") + if envPytest != "test": connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() if heartbeat.db_table_creation_check(connection_db, "hb_common") is False: - cur.execute(""" + cur.execute( + """ CREATE TABLE hb_common ( PROCESS_ID integer primary key, SOURCE_NAME varchar, LAST_ACCESSED_TIME integer, CURRENT_STATE varchar - )""") + )""" + ) cur.execute("INSERT INTO hb_common VALUES(%s, %s, %s, %s)", (process_id, source_name, current_time, state)) _logger.info("MSHBT:Created hb_common DB and updated new values") elif update_flg == 1: - cur.execute("UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s " - "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", (current_time, state, process_id, source_name)) + cur.execute( + "UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s " + "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", + (current_time, state, process_id, source_name), + ) _logger.info("MSHBT:Updated hb_common DB with new values") heartbeat.commit_and_close_db(connection_db) cur.close() def create_update_vnf_table_1(jsfile, update_db, connection_db): - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) - hbcfg = cfg['heartbeat_config'] + hbcfg = cfg["heartbeat_config"] jhbcfg = json.loads(hbcfg) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] else: cur = connection_db.cursor() if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False: - cur.execute(""" + cur.execute( + """ CREATE TABLE vnf_table_1 ( EVENT_NAME varchar primary key, HEARTBEAT_MISSED_COUNT integer, @@ -153,7 +160,8 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): VERSION varchar, SOURCE_NAME_COUNT integer, VALIDITY_FLAG integer - )""") + )""" + ) _logger.info("MSHBT:Created vnf_table_1 table") if update_db == 1: cur.execute("UPDATE vnf_table_1 SET VALIDITY_FLAG=0 WHERE VALIDITY_FLAG=1") @@ -161,35 +169,62 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): # Put some initial values into the queue cur.execute("SELECT event_name FROM vnf_table_1") vnf_list = [item[0] for item in cur.fetchall()] - for vnf in (jhbcfg['vnfs']): - nfc = vnf['eventName'] + for vnf in jhbcfg["vnfs"]: + nfc = vnf["eventName"] validity_flag = 1 source_name_count = 0 - missed = vnf['heartbeatcountmissed'] - intvl = vnf['heartbeatinterval'] - clloop = vnf['closedLoopControlName'] - policyVersion = vnf['policyVersion'] - policyName = vnf['policyName'] - policyScope = vnf['policyScope'] - target_type = vnf['target_type'] - target = vnf['target'] - version = vnf['version'] - - if envPytest == 'test': + missed = vnf["heartbeatcountmissed"] + intvl = vnf["heartbeatinterval"] + clloop = vnf["closedLoopControlName"] + policyVersion = vnf["policyVersion"] + policyName = vnf["policyName"] + policyScope = vnf["policyScope"] + target_type = vnf["target_type"] + target = vnf["target"] + version = vnf["version"] + + if envPytest == "test": # skip executing SQL in test continue if nfc not in vnf_list: - cur.execute("INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", - (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, - version, source_name_count, validity_flag)) + cur.execute( + "INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + ( + nfc, + missed, + intvl, + clloop, + policyVersion, + policyName, + policyScope, + target_type, + target, + version, + source_name_count, + validity_flag, + ), + ) _logger.debug("Inserted new event_name = %s into vnf_table_1", nfc) else: - cur.execute("""UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s, + cur.execute( + """UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s, CLOSED_CONTROL_LOOP_NAME = %s, POLICY_VERSION = %s, POLICY_NAME = %s, POLICY_SCOPE = %s, TARGET_TYPE = %s, TARGET = %s, VERSION = %s, VALIDITY_FLAG = %s where EVENT_NAME = %s""", - (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, - validity_flag, nfc)) - if envPytest != 'test': + ( + missed, + intvl, + clloop, + policyVersion, + policyName, + policyScope, + target_type, + target, + version, + validity_flag, + nfc, + ), + ) + if envPytest != "test": cur.close() _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file") @@ -210,31 +245,36 @@ def db_monitoring_process(current_pid, jsfile): def read_hb_properties_default(): # Read the hbproperties.yaml for postgress and CBS related data - s = open(hb_properties_file, 'r') + s = open(hb_properties_file, "r") a = yaml.full_load(s) - if (os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None): - ip_address = a['pg_ipAddress'] - port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] + if ( + (os.getenv("pg_ipAddress") is None) + or (os.getenv("pg_portNum") is None) + or (os.getenv("pg_userName") is None) + or (os.getenv("pg_passwd") is None) + ): + ip_address = a["pg_ipAddress"] + port_num = a["pg_portNum"] + user_name = a["pg_userName"] + password = a["pg_passwd"] else: - ip_address = os.getenv('pg_ipAddress') - port_num = os.getenv('pg_portNum') - user_name = os.getenv('pg_userName') - password = os.getenv('pg_passwd') + ip_address = os.getenv("pg_ipAddress") + port_num = os.getenv("pg_portNum") + user_name = os.getenv("pg_userName") + password = os.getenv("pg_passwd") - dbName = a['pg_dbName'] + dbName = a["pg_dbName"] db_name = dbName.lower() - cbs_polling_required = a['CBS_polling_allowed'] - cbs_polling_interval = a['CBS_polling_interval'] + cbs_polling_required = a["CBS_polling_allowed"] + cbs_polling_interval = a["CBS_polling_interval"] s.close() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval def read_hb_properties(jsfile): try: - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) except Exception as err: msg = "CBS Json file load error - " + str(err) @@ -242,20 +282,20 @@ def read_hb_properties(jsfile): return read_hb_properties_default() try: - ip_address = str(cfg['pg_ipAddress']) - port_num = str(cfg['pg_portNum']) - user_name = str(cfg['pg_userName']) - password = str(cfg['pg_passwd']) - dbName = str(cfg['pg_dbName']) + ip_address = str(cfg["pg_ipAddress"]) + port_num = str(cfg["pg_portNum"]) + user_name = str(cfg["pg_userName"]) + password = str(cfg["pg_passwd"]) + dbName = str(cfg["pg_dbName"]) db_name = dbName.lower() - cbs_polling_required = str(cfg['CBS_polling_allowed']) - cbs_polling_interval = str(cfg['CBS_polling_interval']) - consumer_id = str(cfg['consumerID']) - group_id = str(cfg['groupID']) - os.environ['consumerID'] = consumer_id - os.environ['groupID'] = group_id + cbs_polling_required = str(cfg["CBS_polling_allowed"]) + cbs_polling_interval = str(cfg["CBS_polling_interval"]) + consumer_id = str(cfg["consumerID"]) + group_id = str(cfg["groupID"]) + os.environ["consumerID"] = consumer_id + os.environ["groupID"] = group_id if "SERVICE_NAME" in cfg: - os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) + os.environ["SERVICE_NAME"] = str(cfg["SERVICE_NAME"]) except Exception as err: msg = "CBS Json file read parameter error - " + str(err) _logger.error(msg) @@ -273,7 +313,7 @@ def fetch_json_file() -> str: # Try to get config from CBS. If succeeded, config json is stored to tds.c_config . if get_cbs_config(): # Save config to temporary file - with tempfile.NamedTemporaryFile('w', delete=False) as temp: + with tempfile.NamedTemporaryFile("w", delete=False) as temp: _logger.info("MSHBD: New config saved to temp file %s", temp.name) json.dump(tds.c_config, temp) # Swap current config with downloaded config @@ -289,8 +329,8 @@ def fetch_json_file() -> str: def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name): - envPytest = os.getenv('pytest', "") - if envPytest != 'test': # pragma: no cover + envPytest = os.getenv("pytest", "") + if envPytest != "test": # pragma: no cover if update_db == 0: create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name @@ -310,7 +350,13 @@ def create_process(job_list, jsfile, pid_current): if len(job_list) == 0: p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) time.sleep(1) - p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,)) + p2 = multiprocessing.Process( + target=db_monitoring_process, + args=( + pid_current, + jsfile, + ), + ) p1.start() time.sleep(1) p2.start() @@ -322,7 +368,7 @@ def create_process(job_list, jsfile, pid_current): def main(): - get_logger.configure_logger('misshtbtd') + get_logger.configure_logger("misshtbtd") pid_current = os.getpid() hc_proc = multiprocessing.Process(target=check_health.start_health_check_server) cbs_polling_proc = multiprocessing.Process(target=cbs_polling.cbs_polling_loop, args=(pid_current,)) @@ -334,15 +380,32 @@ def main(): job_list = [] jsfile = fetch_json_file() - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile) - msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = read_hb_properties(jsfile) + msg = ( + "MSHBT:HB Properties -", + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) _logger.info(msg) update_db = 0 create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) state = "RECONFIGURATION" update_flg = 0 create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) - if cbs_polling_required == 'True': + if cbs_polling_required == "True": # note: cbs_polling process must be started after `hb_common` table created cbs_polling_proc.start() _logger.info("MSHBD: Started CBS polling process. PID=%d", cbs_polling_proc.pid) @@ -350,17 +413,27 @@ def main(): _logger.info("MSHBD:Now be in a continuous loop") i = 0 while True: - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common( + user_name, password, ip_address, port_num, db_name + ) msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time _logger.info(msg) current_time = int(round(time.time())) time_difference = current_time - hbc_time - msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference + msg = ( + "MSHBD:pid,srcName,state,time,ctime,timeDiff is", + hbc_pid, + hbc_srcName, + hbc_state, + hbc_time, + current_time, + time_difference, + ) _logger.info(msg) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) + envPytest = os.getenv("pytest", "") + if envPytest == "test": if i == 2: hbc_pid = pid_current source_name = hbc_srcName @@ -376,9 +449,13 @@ def main(): if hbc_state == "RUNNING": state = "RUNNING" update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name + ) elif hbc_state == "RECONFIGURATION": - _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") + _logger.info( + "MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes" + ) jsfile = fetch_json_file() update_db = 1 create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) @@ -387,7 +464,9 @@ def main(): job_list = create_process(job_list, jsfile, pid_current) state = "RUNNING" update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name + ) else: _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") @@ -395,7 +474,11 @@ def main(): _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") else: jsfile = fetch_json_file() - msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current + msg = ( + "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", + jsfile, + pid_current, + ) _logger.info(msg) job_list = create_process(job_list, jsfile, pid_current) else: @@ -409,13 +492,17 @@ def main(): msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current _logger.info(msg) job_list = create_process(job_list, jsfile, pid_current) - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common( + user_name, password, ip_address, port_num, db_name + ) update_flg = 1 - create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name + ) else: _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") time.sleep(25) - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": i = i + 1 if i > 5: _logger.info("Terminating main process for pytest") @@ -451,5 +538,5 @@ def main(): cbs_polling_proc.join() -if __name__ == '__main__': +if __name__ == "__main__": main() -- cgit 1.2.3-korg