diff options
author | Satoshi Fujii <fujii-satoshi@jp.fujitsu.com> | 2021-06-15 13:19:51 +0000 |
---|---|---|
committer | Satoshi Fujii <fujii-satoshi@jp.fujitsu.com> | 2021-06-18 07:40:41 +0000 |
commit | d39331be3f55876a8a6fc31c5a10f2d68eefe0b0 (patch) | |
tree | 53a2009159f82d9e362f6d3204fb01a8c24756ee /miss_htbt_service/misshtbtd.py | |
parent | 6c8c23c92cc9e01a652599ca9d97ffdcfd4f4883 (diff) |
Reformat code
Use 4 spaces for indentation and put spaces for better readability.
Signed-off-by: Satoshi Fujii <fujii-satoshi@jp.fujitsu.com>
Issue-ID: DCAEGEN2-2833
Change-Id: I99aa4df83a32b077e2a3f336d17b6b64184c3c12
Diffstat (limited to 'miss_htbt_service/misshtbtd.py')
-rw-r--r-- | miss_htbt_service/misshtbtd.py | 365 |
1 files changed, 191 insertions, 174 deletions
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 868020c..12d439a 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -44,21 +44,22 @@ import get_logger from mod import trapd_settings as tds from mod.trapd_get_cbs_config import get_cbs_config -hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) +hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) ABSOLUTE_PATH1 = path.abspath(path.join(__file__, "../htbtworker.py")) ABSOLUTE_PATH2 = path.abspath(path.join(__file__, "../db_monitoring.py")) ABSOLUTE_PATH3 = path.abspath(path.join(__file__, "../check_health.py")) ABSOLUTE_PATH4 = path.abspath(path.join(__file__, "../cbs_polling.py")) + def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name): from psycopg2 import connect try: - con = connect(user=user_name, host = ip_address, password = password) + con = connect(user=user_name, host=ip_address, password=password) database_name = db_name con.autocommit = True cur = con.cursor() - query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" %(database_name) + query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" % (database_name) cur.execute(query_value) not_exists_row = cur.fetchone() msg = "MSHBT:Create_database:DB not exists? ", not_exists_row @@ -66,7 +67,7 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password not_exists = not_exists_row[0] if not_exists is True: _logger.info("MSHBT:Creating database ...") - query_value = "CREATE DATABASE %s" %(database_name) + query_value = "CREATE DATABASE %s" % (database_name) cur.execute(query_value) else: _logger.info("MSHBD:Database already exists") @@ -81,16 +82,17 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password cur.close() con.close() except(Exception) as err: - msg = "MSHBD:DB Creation -",err + msg = "MSHBD:DB Creation -", err _logger.error(msg) -#def get_pol_and_mr_urls(jsfile, pol_url, mr_url): + +# def get_pol_and_mr_urls(jsfile, pol_url, mr_url): # with open(jsfile, 'r') as outfile: # cfg = json.load(outfile) # mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) # pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) -def read_hb_common(user_name,password,ip_address,port_num,db_name): +def read_hb_common(user_name, password, ip_address, port_num, db_name): envPytest = os.getenv('pytest', "") if (envPytest == 'test'): hbc_pid = 10 @@ -98,7 +100,7 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name): hbc_time = 1584595881 hbc_state = "RUNNING" return hbc_pid, hbc_state, hbc_srcName, hbc_time - connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;" cur.execute(query_value) @@ -111,27 +113,29 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name): cur.close() return hbc_pid, hbc_state, hbc_srcName, hbc_time -def create_update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name): + +def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name): current_time = int(round(time.time())) source_name = socket.gethostname() source_name = source_name + "-" + os.getenv('SERVICE_NAME', "") envPytest = os.getenv('pytest', "") if (envPytest != 'test'): - connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) - cur = connection_db.cursor() - if(heartbeat.db_table_creation_check(connection_db,"hb_common") ==False): - cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);") - query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" %(process_id, source_name, current_time, state) - _logger.info("MSHBT:Created hb_common DB and updated new values") - cur.execute(query_value) - if(update_flg == 1): - query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state) - _logger.info("MSHBT:Updated hb_common DB with new values") - cur.execute(query_value) - heartbeat.commit_and_close_db(connection_db) - cur.close() - -def create_update_vnf_table_1(jsfile,update_db,connection_db): + connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) + cur = connection_db.cursor() + if (heartbeat.db_table_creation_check(connection_db, "hb_common") == False): + cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);") + query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" % (process_id, source_name, current_time, state) + _logger.info("MSHBT:Created hb_common DB and updated new values") + cur.execute(query_value) + if (update_flg == 1): + query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" % (process_id, source_name, current_time, state) + _logger.info("MSHBT:Updated hb_common DB with new values") + cur.execute(query_value) + heartbeat.commit_and_close_db(connection_db) + cur.close() + + +def create_update_vnf_table_1(jsfile, update_db, connection_db): with open(jsfile, 'r') as outfile: cfg = json.load(outfile) hbcfg = cfg['heartbeat_config'] @@ -141,10 +145,10 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db): vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] else: cur = connection_db.cursor() - if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False): + if (heartbeat.db_table_creation_check(connection_db, "vnf_table_1") == False): cur.execute("CREATE TABLE vnf_table_1 (EVENT_NAME varchar primary key,HEARTBEAT_MISSED_COUNT integer,HEARTBEAT_INTERVAL integer,CLOSED_CONTROL_LOOP_NAME varchar,POLICY_VERSION varchar,POLICY_NAME varchar,POLICY_SCOPE varchar,TARGET_TYPE varchar,TARGET varchar, VERSION varchar,SOURCE_NAME_COUNT integer,VALIDITY_FLAG integer);") _logger.info("MSHBT:Created vnf_table_1 table") - if(update_db == 1): + if (update_db == 1): query_value = "UPDATE vnf_table_1 SET VALIDITY_FLAG=0 where VALIDITY_FLAG=1;" cur.execute(query_value) _logger.info("MSHBT:Set Validity flag to zero in vnf_table_1 table") @@ -154,7 +158,7 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db): vnf_list = [item[0] for item in cur.fetchall()] for vnf in (jhbcfg['vnfs']): nfc = vnf['eventName'] - #_logger.error("MSHBT:",nfc) + # _logger.error("MSHBT:",nfc) validity_flag = 1 source_name_count = 0 missed = vnf['heartbeatcountmissed'] @@ -167,62 +171,69 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db): target = vnf['target'] version = vnf['version'] - if(nfc not in vnf_list): - query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" %(nfc,missed,intvl,clloop,policyVersion,policyName,policyScope,target_type, target,version,source_name_count,validity_flag) + if (nfc not in vnf_list): + query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" % (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, source_name_count, validity_flag) else: - query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" %(missed,intvl,clloop,policyVersion,policyName,policyScope,target_type,target,version,validity_flag,nfc) + query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" % (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, validity_flag, nfc) if (envPytest != 'test'): cur.execute(query_value) - #heartbeat.commit_and_close_db(connection_db) + # heartbeat.commit_and_close_db(connection_db) if (envPytest != 'test'): cur.close() _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file") + def hb_cbs_polling_process(pid_current): - subprocess.call([ABSOLUTE_PATH4 , str(pid_current) ]) - sys.stdout.flush() - _logger.info("MSHBT:Creaated CBS polling process") - return + subprocess.call([ABSOLUTE_PATH4, str(pid_current)]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated CBS polling process") + return + + def hb_worker_process(config_file_path): - subprocess.call([ABSOLUTE_PATH1 , config_file_path ]) - sys.stdout.flush() - _logger.info("MSHBT:Creaated Heartbeat worker process") - return - -def db_monitoring_process(current_pid,jsfile): - subprocess.call([ABSOLUTE_PATH2 , str(current_pid),jsfile]) - sys.stdout.flush() - _logger.info("MSHBT:Creaated DB Monitoring process") - return + subprocess.call([ABSOLUTE_PATH1, config_file_path]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated Heartbeat worker process") + return + + +def db_monitoring_process(current_pid, jsfile): + subprocess.call([ABSOLUTE_PATH2, str(current_pid), jsfile]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated DB Monitoring process") + return + + def read_hb_properties_default(): - #Read the hbproperties.yaml for postgress and CBS related data - s=open(hb_properties_file, 'r') - a=yaml.full_load(s) - - if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): - ip_address = a['pg_ipAddress'] - port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] - else: - ip_address = os.getenv('pg_ipAddress') - port_num = os.getenv('pg_portNum') - user_name = os.getenv('pg_userName') - password = os.getenv('pg_passwd') + # Read the hbproperties.yaml for postgress and CBS related data + s = open(hb_properties_file, 'r') + a = yaml.full_load(s) + + if ((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): + ip_address = a['pg_ipAddress'] + port_num = a['pg_portNum'] + user_name = a['pg_userName'] + password = a['pg_passwd'] + else: + ip_address = os.getenv('pg_ipAddress') + port_num = os.getenv('pg_portNum') + user_name = os.getenv('pg_userName') + password = os.getenv('pg_passwd') + + dbName = a['pg_dbName'] + db_name = dbName.lower() + cbs_polling_required = a['CBS_polling_allowed'] + cbs_polling_interval = a['CBS_polling_interval'] + s.close() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval - dbName = a['pg_dbName'] - db_name = dbName.lower() - cbs_polling_required = a['CBS_polling_allowed'] - cbs_polling_interval = a['CBS_polling_interval'] - s.close() - return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval def read_hb_properties(jsfile): try: with open(jsfile, 'r') as outfile: - cfg = json.load(outfile) + cfg = json.load(outfile) except(Exception) as err: - msg = "CBS Json file load error - ",err + msg = "CBS Json file load error - ", err _logger.error(msg) return read_hb_properties_default() @@ -235,21 +246,22 @@ def read_hb_properties(jsfile): db_name = dbName.lower() cbs_polling_required = str(cfg['CBS_polling_allowed']) cbs_polling_interval = str(cfg['CBS_polling_interval']) - consumer_id = str(cfg['consumerID']) - group_id = str(cfg['groupID']) + consumer_id = str(cfg['consumerID']) + group_id = str(cfg['groupID']) os.environ['consumerID'] = consumer_id os.environ['groupID'] = group_id if "SERVICE_NAME" in cfg: - os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) + os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) except(Exception) as err: - msg = "CBS Json file read parameter error - ",err + msg = "CBS Json file read parameter error - ", err _logger.error(msg) return read_hb_properties_default() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + def fetch_json_file(): if get_cbs_config(): - #current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" + # current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" envPytest = os.getenv('pytest', "") if (envPytest == 'test'): current_runtime_config_file_name = "/tmp/opt/app/miss_htbt_service/etc/config.json" @@ -260,11 +272,11 @@ def fetch_json_file(): with open(current_runtime_config_file_name, 'w') as outfile: json.dump(tds.c_config, outfile) if os.getenv('pytest', "") == 'test': - jsfile = current_runtime_config_file_name + jsfile = current_runtime_config_file_name else: - jsfile = "../etc/config.json" - os.system('cp ../etc/download.json ../etc/config.json') - os.remove("../etc/download.json") + jsfile = "../etc/config.json" + os.system('cp ../etc/download.json ../etc/config.json') + os.remove("../etc/download.json") else: msg = "MSHBD:CBS Config not available, using local config" _logger.warning(msg) @@ -277,39 +289,43 @@ def fetch_json_file(): _logger.info(msg) return jsfile + def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name): envPytest = os.getenv('pytest', "") - if (envPytest != 'test'): #pragma: no cover - if(update_db == 0): - create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + if (envPytest != 'test'): # pragma: no cover + if (update_db == 0): + create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name _logger.info(msg) - connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - if(update_db == 0): - if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False): - create_update_vnf_table_1(jsfile,update_db,connection_db) + if (update_db == 0): + if (heartbeat.db_table_creation_check(connection_db, "vnf_table_1") == False): + create_update_vnf_table_1(jsfile, update_db, connection_db) else: - create_update_vnf_table_1(jsfile,update_db,connection_db) + create_update_vnf_table_1(jsfile, update_db, connection_db) heartbeat.commit_and_close_db(connection_db) cur.close() + def create_process(job_list, jsfile, pid_current): - if(len(job_list) == 0): - p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) + if (len(job_list) == 0): + p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) time.sleep(1) - p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current,jsfile,)) + p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,)) p1.start() time.sleep(1) p2.start() job_list.append(p1) job_list.append(p2) - msg = "MSHBD:jobs list is",job_list + msg = "MSHBD:jobs list is", job_list _logger.info(msg) return job_list + _logger = get_logger.get_logger(__name__) + def main(): try: subprocess.Popen([ABSOLUTE_PATH3], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) @@ -320,109 +336,110 @@ def main(): ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile) msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval _logger.info(msg) - if(cbs_polling_required == 'True'): - p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,)) - p3.start() + if (cbs_polling_required == 'True'): + p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,)) + p3.start() update_db = 0 create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) state = "RECONFIGURATION" update_flg = 0 - create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) - msg = "MSHBD:Current process id is",pid_current + create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + msg = "MSHBD:Current process id is", pid_current _logger.info(msg) _logger.info("MSHBD:Now be in a continuous loop") - i=0 - while(True): - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) - msg = "MSHBT: hb_common values ",hbc_pid, hbc_state, hbc_srcName, hbc_time - _logger.info(msg) - current_time = int(round(time.time())) - time_difference = current_time - hbc_time - msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is",hbc_pid,hbc_srcName,hbc_state,hbc_time,current_time,time_difference - _logger.info(msg) - source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) - envPytest = os.getenv('pytest', "") - if (envPytest == 'test'): - if i == 2: - hbc_pid = pid_current - source_name = hbc_srcName - hbc_state = "RECONFIGURATION" - elif (i>3): - hbc_pid = pid_current - source_name = hbc_srcName - hbc_state = "RUNNING" - if (time_difference <60): - if((int(hbc_pid)==int(pid_current)) and (source_name==hbc_srcName)): - msg = "MSHBD:config status is",hbc_state + i = 0 + while (True): + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time + _logger.info(msg) + current_time = int(round(time.time())) + time_difference = current_time - hbc_time + msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference + _logger.info(msg) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + if i == 2: + hbc_pid = pid_current + source_name = hbc_srcName + hbc_state = "RECONFIGURATION" + elif (i > 3): + hbc_pid = pid_current + source_name = hbc_srcName + hbc_state = "RUNNING" + if (time_difference < 60): + if ((int(hbc_pid) == int(pid_current)) and (source_name == hbc_srcName)): + msg = "MSHBD:config status is", hbc_state + _logger.info(msg) + if (hbc_state == "RUNNING"): + state = "RUNNING" + update_flg = 1 + create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + elif (hbc_state == "RECONFIGURATION"): + _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") + jsfile = fetch_json_file() + update_db = 1 + create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + msg = "MSHBD: parameters passed to DBM and HB are %d and %s", pid_current _logger.info(msg) - if(hbc_state=="RUNNING"): - state = "RUNNING" - update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) - elif(hbc_state=="RECONFIGURATION"): - _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") - jsfile = fetch_json_file() - update_db = 1 - create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) - msg = "MSHBD: parameters passed to DBM and HB are %d and %s",pid_current - _logger.info(msg) - job_list = create_process(job_list, jsfile, pid_current) - state = "RUNNING" - update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) + job_list = create_process(job_list, jsfile, pid_current) + state = "RUNNING" + update_flg = 1 + create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) - else: - _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") - if(len(job_list)>=2): - _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") - else: - jsfile = fetch_json_file() - msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",jsfile,pid_current - _logger.info(msg) - job_list = create_process(job_list, jsfile, pid_current) else: - _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover") - if((int(hbc_pid)!=int(pid_current))or (source_name!=hbc_srcName)): - _logger.info("MSHBD:Initiating to become Active Instance") - if(len(job_list)>=2): - _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE") - else: - jsfile = fetch_json_file() - msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s",jsfile,pid_current - _logger.info(msg) - job_list = create_process(job_list, jsfile, pid_current) - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) - update_flg = 1 - create_update_hb_common(update_flg, pid_current, hbc_state, user_name,password,ip_address,port_num,db_name) + _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") + if (len(job_list) >= 2): + _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") + else: + jsfile = fetch_json_file() + msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + else: + _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover") + if ((int(hbc_pid) != int(pid_current)) or (source_name != hbc_srcName)): + _logger.info("MSHBD:Initiating to become Active Instance") + if (len(job_list) >= 2): + _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE") else: - _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") - time.sleep(25) - if os.getenv('pytest', "") == 'test': - i = i + 1 - if(i > 5): - _logger.info("Terminating main process for pytest") - p3.terminate() + jsfile = fetch_json_file() + msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + update_flg = 1 + create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name) + else: + _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") + time.sleep(25) + if os.getenv('pytest', "") == 'test': + i = i + 1 + if (i > 5): + _logger.info("Terminating main process for pytest") + p3.terminate() + time.sleep(1) + p3.join() + if (len(job_list) > 0): + job_list[0].terminate() time.sleep(1) - p3.join() - if(len(job_list)>0): - job_list[0].terminate() - time.sleep(1) - job_list[0].join() - job_list.remove(job_list[0]) - if(len(job_list)>0): - job_list[0].terminate() - time.sleep(1) - job_list[0].join() - job_list.remove(job_list[0]) - break + job_list[0].join() + job_list.remove(job_list[0]) + if (len(job_list) > 0): + job_list[0].terminate() + time.sleep(1) + job_list[0].join() + job_list.remove(job_list[0]) + break except (Exception) as e: - msg = "MSHBD:Exception as %s" %(str(traceback.format_exc())) + msg = "MSHBD:Exception as %s" % (str(traceback.format_exc())) _logger.error(msg) msg = "Fatal error. Could not start missing heartbeat service due to: {0}".format(e) _logger.error(msg) + if __name__ == '__main__': main() |