diff options
author | Satoshi Fujii <fujii-satoshi@jp.fujitsu.com> | 2021-06-15 13:19:51 +0000 |
---|---|---|
committer | Satoshi Fujii <fujii-satoshi@jp.fujitsu.com> | 2021-06-18 07:40:41 +0000 |
commit | d39331be3f55876a8a6fc31c5a10f2d68eefe0b0 (patch) | |
tree | 53a2009159f82d9e362f6d3204fb01a8c24756ee /miss_htbt_service | |
parent | 6c8c23c92cc9e01a652599ca9d97ffdcfd4f4883 (diff) |
Reformat code
Use 4 spaces for indentation and put spaces for better readability.
Signed-off-by: Satoshi Fujii <fujii-satoshi@jp.fujitsu.com>
Issue-ID: DCAEGEN2-2833
Change-Id: I99aa4df83a32b077e2a3f336d17b6b64184c3c12
Diffstat (limited to 'miss_htbt_service')
-rw-r--r-- | miss_htbt_service/cbs_polling.py | 26 | ||||
-rw-r--r-- | miss_htbt_service/check_health.py | 6 | ||||
-rw-r--r-- | miss_htbt_service/config_notif.py | 134 | ||||
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 172 | ||||
-rw-r--r-- | miss_htbt_service/get_logger.py | 9 | ||||
-rw-r--r-- | miss_htbt_service/htbtworker.py | 271 | ||||
-rw-r--r-- | miss_htbt_service/misshtbtd.py | 365 |
7 files changed, 516 insertions, 467 deletions
diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py index aa6ac8d..d1253c0 100644 --- a/miss_htbt_service/cbs_polling.py +++ b/miss_htbt_service/cbs_polling.py @@ -36,32 +36,34 @@ _logger = get_logger.get_logger(__name__) def pollCBS(current_pid): jsfile = db.fetch_json_file() ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) - msg="CBSP:Main process ID in hb_common is %d",hbc_pid + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + msg = "CBSP:Main process ID in hb_common is %d", hbc_pid _logger.info(msg) - msg="CBSP:My parent process ID is %d",current_pid + msg = "CBSP:My parent process ID is %d", current_pid _logger.info(msg) - msg="CBSP:CBS Polling interval is %d", cbs_polling_interval + msg = "CBSP:CBS Polling interval is %d", cbs_polling_interval _logger.info(msg) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): - cbs_polling_interval = "30" + cbs_polling_interval = "30" time.sleep(int(cbs_polling_interval)) - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) - #connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) - #cur = connection_db.cursor() + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + # connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + # cur = connection_db.cursor() source_name = socket.gethostname() source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) - result= True - if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"): + result = True + if (int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING"): _logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION") state = "RECONFIGURATION" update_flg = 1 - db.create_update_hb_common(update_flg, hbc_pid, state, user_name,password,ip_address,port_num,db_name) + db.create_update_hb_common(update_flg, hbc_pid, state, user_name, password, ip_address, port_num, db_name) else: _logger.info("CBSP:Inactive instance or hb_common state is not RUNNING") return result + + if __name__ == "__main__": current_pid = sys.argv[1] - while(True): + while (True): pollCBS(current_pid) diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py index 4266273..739cd2b 100644 --- a/miss_htbt_service/check_health.py +++ b/miss_htbt_service/check_health.py @@ -39,7 +39,7 @@ class GetHandler(BaseHTTPRequestHandler): 'sys_version=%s' % self.sys_version, 'protocol_version=%s' % self.protocol_version, '', - ]) + ]) self.send_response(200) self.end_headers() self.wfile.write(bytes(message, 'utf-8')) @@ -56,9 +56,11 @@ class GetHandler(BaseHTTPRequestHandler): self.wfile.write(bytes(data['health'], 'utf-8')) return + if __name__ == '__main__': from http.server import HTTPServer - #from BaseHTTPServer import HTTPServer + + # from BaseHTTPServer import HTTPServer server = HTTPServer(("", 10002), GetHandler) print('Starting server at http://localhost:10002') server.serve_forever() diff --git a/miss_htbt_service/config_notif.py b/miss_htbt_service/config_notif.py index 913d8a5..087315d 100644 --- a/miss_htbt_service/config_notif.py +++ b/miss_htbt_service/config_notif.py @@ -36,27 +36,29 @@ import psycopg2 import mod.trapd_get_cbs_config import mod.trapd_settings as tds -hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) +hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) -def postgres_db_open(username,password,host,port,database_name): + +def postgres_db_open(username, password, host, port, database_name): envPytest = os.getenv('pytest', "") if (envPytest == 'test'): return True try: - connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port) + connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port) except Exception as e: print("HB_Notif::postgress connect error: %s" % e) connection = True return connection -def db_table_creation_check(connection_db,table_name): + +def db_table_creation_check(connection_db, table_name): envPytest = os.getenv('pytest', "") if (envPytest == 'test'): return True cur = None try: cur = connection_db.cursor() - query_db = "select * from information_schema.tables where table_name='%s'" %(table_name) + query_db = "select * from information_schema.tables where table_name='%s'" % (table_name) cur.execute(query_db) database_names = cur.fetchone() if (database_names is not None) and (table_name in database_names): @@ -73,45 +75,49 @@ def db_table_creation_check(connection_db,table_name): if cur: cur.close() + def commit_and_close_db(connection_db): envPytest = os.getenv('pytest', "") if (envPytest == 'test'): return True try: - connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() return True except psycopg2.DatabaseError as e: return False + def read_hb_properties_default(): - #Read the hbproperties.yaml for postgress and CBS related data - s=open(hb_properties_file, 'r') - a=yaml.full_load(s) - if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): - ip_address = a['pg_ipAddress'] - port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] - else: - ip_address = os.getenv('pg_ipAddress') - port_num = os.getenv('pg_portNum') - user_name = os.getenv('pg_userName') - password = os.getenv('pg_passwd') + # Read the hbproperties.yaml for postgress and CBS related data + s = open(hb_properties_file, 'r') + a = yaml.full_load(s) + if ((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or ( + os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): + ip_address = a['pg_ipAddress'] + port_num = a['pg_portNum'] + user_name = a['pg_userName'] + password = a['pg_passwd'] + else: + ip_address = os.getenv('pg_ipAddress') + port_num = os.getenv('pg_portNum') + user_name = os.getenv('pg_userName') + password = os.getenv('pg_passwd') + + dbName = a['pg_dbName'] + db_name = dbName.lower() + cbs_polling_required = a['CBS_polling_allowed'] + cbs_polling_interval = a['CBS_polling_interval'] + s.close() + # TODO: there is a mismatch here between read_hb_properties_default and read_hb_properties. + # read_hb_properties() forces all of the variables returned here to be strings, while the code here does not. + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval - dbName = a['pg_dbName'] - db_name = dbName.lower() - cbs_polling_required = a['CBS_polling_allowed'] - cbs_polling_interval = a['CBS_polling_interval'] - s.close() - # TODO: there is a mismatch here between read_hb_properties_default and read_hb_properties. - # read_hb_properties() forces all of the variables returned here to be strings, while the code here does not. - return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval def read_hb_properties(jsfile): try: with open(jsfile, 'r') as outfile: - cfg = json.load(outfile) + cfg = json.load(outfile) except(Exception) as err: print("Json file read error - %s" % err) return read_hb_properties_default() @@ -125,13 +131,14 @@ def read_hb_properties(jsfile): cbs_polling_required = str(cfg['CBS_polling_allowed']) cbs_polling_interval = str(cfg['CBS_polling_interval']) if "SERVICE_NAME" in cfg: - os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) + os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) except(Exception) as err: print("Json file read parameter error - %s" % err) return read_hb_properties_default() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval -def read_hb_common(user_name,password,ip_address,port_num,db_name): + +def read_hb_common(user_name, password, ip_address, port_num, db_name): envPytest = os.getenv('pytest', "") if (envPytest == 'test'): hbc_pid = 10 @@ -139,7 +146,7 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name): hbc_time = 1541234567 hbc_state = "RUNNING" return hbc_pid, hbc_state, hbc_srcName, hbc_time - connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name) + connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;" cur.execute(query_value) @@ -154,22 +161,25 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name): cur.close() return hbc_pid, hbc_state, hbc_srcName, hbc_time -def update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name): + +def update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name): current_time = int(round(time.time())) source_name = socket.gethostname() source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): return True - connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name) + connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state) + query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" % ( + process_id, source_name, current_time, state) cur.execute(query_value) commit_and_close_db(connection_db) cur.close() return True -def fetch_json_file(download_json = "../etc/download1.json", config_json = "../etc/config.json"): + +def fetch_json_file(download_json="../etc/download1.json", config_json="../etc/config.json"): # use the fully qualified name here to let monkeypatching work # if get_cbs_config(): if mod.trapd_get_cbs_config.get_cbs_config(): @@ -188,31 +198,33 @@ def fetch_json_file(download_json = "../etc/download1.json", config_json = "../e print("Config_N: The json file is - %s" % jsfile) return jsfile -#if __name__ == "__main__": + +# if __name__ == "__main__": def config_notif_run(): - jsfile = fetch_json_file() - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile) - envPytest = os.getenv('pytest', "") - if (envPytest == 'test'): + jsfile = fetch_json_file() + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties( + jsfile) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): return True - connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name) - cur = connection_db.cursor() - if(db_table_creation_check(connection_db,"hb_common") == False): - print("HB_Notif::ERROR::hb_common table not exists - No config download") - connection_db.close() - else: - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) - state = "RECONFIGURATION" - update_flg = 1 - ret = update_hb_common(update_flg, hbc_pid, state, user_name,password,ip_address,port_num,db_name) - # TODO: There is no way for update_hb_common() to return false - if (ret == True): - print("HB_Notif::hb_common table updated with RECONFIGURATION state") - commit_and_close_db(connection_db) - return True - else: - print("HB_Notif::Failure updating hb_common table") - commit_and_close_db(connection_db) - return False - - cur.close() + connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) + cur = connection_db.cursor() + if (db_table_creation_check(connection_db, "hb_common") == False): + print("HB_Notif::ERROR::hb_common table not exists - No config download") + connection_db.close() + else: + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + state = "RECONFIGURATION" + update_flg = 1 + ret = update_hb_common(update_flg, hbc_pid, state, user_name, password, ip_address, port_num, db_name) + # TODO: There is no way for update_hb_common() to return false + if (ret == True): + print("HB_Notif::hb_common table updated with RECONFIGURATION state") + commit_and_close_db(connection_db) + return True + else: + print("HB_Notif::Failure updating hb_common table") + commit_and_close_db(connection_db) + return False + + cur.close() diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index df1bae7..708a2bd 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -35,56 +35,58 @@ import get_logger _logger = get_logger.get_logger(__name__) -def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target): - msg="DBM:Time to raise Control Loop Event for Control loop typ /target type - ",CLType, target_type + +def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, + closed_control_loop_name, version, target): + msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type _logger.info(msg) - if(CLType == "ONSET"): + if (CLType == "ONSET"): _logger.info("DBM:Heartbeat not received, raising alarm event") - if(target_type == "VNF"): + if (target_type == "VNF"): json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "generic-vnf.vnf-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - elif(target_type == "VM"): + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"generic-vnf.vnf-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + elif (target_type == "VM"): json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "vserver.vserver-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"vserver.vserver-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); else: return True - elif(CLType == "ABATED"): + elif (CLType == "ABATED"): _logger.info("DBM:Heartbeat received, clearing alarm event") - #last_date_time = datetime.datetime.now() - if(target_type == "VNF"): + # last_date_time = datetime.datetime.now() + if (target_type == "VNF"): json_object = json.dumps({ "closedLoopEventClient": "DCAE_Heartbeat_MS", "policyVersion": policy_version, "policyName": policy_name, "policyScope": policy_scope, "target_type": target_type, - "AAI": { "generic-vnf.vnf-name": srcName} , + "AAI": {"generic-vnf.vnf-name": srcName}, "closedLoopAlarmStart": epoc_time, "closedLoopEventStatus": "ABATED", "closedLoopControlName": closed_control_loop_name, @@ -92,15 +94,15 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_s "target": target, "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", "from": "DCAE" - }); - elif(target_type == "VM"): + }); + elif (target_type == "VM"): json_object = json.dumps({ "closedLoopEventClient": "DCAE_Heartbeat_MS", "policyVersion": policy_version, "policyName": policy_name, "policyScope": policy_scope, "target_type": target_type, - "AAI": { "vserver.vserver-name": srcName} , + "AAI": {"vserver.vserver-name": srcName}, "closedLoopAlarmStart": epoc_time, "closedLoopEventStatus": "ABATED", "closedLoopControlName": closed_control_loop_name, @@ -108,54 +110,55 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_s "target": target, "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", "from": "DCAE" - }); + }); else: return True else: return True payload = json_object - msg="DBM: CL Json object is", json_object + msg = "DBM: CL Json object is", json_object _logger.info(msg) - #psend_url = pol_url+'DefaultGroup/1?timeout=15000' + # psend_url = pol_url+'DefaultGroup/1?timeout=15000' psend_url = pol_url - msg="DBM:",psend_url + msg = "DBM:", psend_url _logger.info(msg) - #Send response for policy on output topic + # Send response for policy on output topic try: r = requests.post(psend_url, data=payload) - msg="DBM:",r.status_code, r.reason + msg = "DBM:", r.status_code, r.reason _logger.info(msg) ret = r.status_code - msg="DBM:Status code for sending the control loop event is",ret + msg = "DBM:Status code for sending the control loop event is", ret _logger.info(msg) except(Exception) as err: - msg='Message send failure : ', err + msg = 'Message send failure : ', err _logger.error(msg) return True -def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,db_name): - while(True): + +def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name): + while (True): time.sleep(20) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): - break + break try: - with open(json_file, 'r') as outfile: - cfg = json.load(outfile) - pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) + with open(json_file, 'r') as outfile: + cfg = json.load(outfile) + pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) except(Exception) as err: - msg='Json file process error : ', err - _logger.error(msg) - continue + msg = 'Json file process error : ', err + _logger.error(msg) + continue - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) source_name = socket.gethostname() source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) - connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"): + if (int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING"): _logger.info("DBM: Active DB Monitoring Instance") db_query = "Select event_name from vnf_table_1" cur.execute(db_query) @@ -165,11 +168,11 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d cur.execute(query_value) rows = cur.fetchall() hbc_state = rows[0][0] - if( hbc_state == "RECONFIGURATION"): + if (hbc_state == "RECONFIGURATION"): _logger.info("DBM:Waiting for hb_common state to become RUNNING") break - db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" %(event_name) + db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" % (event_name) cur.execute(db_query) rows = cur.fetchall() validity_flag = rows[0][0] @@ -183,59 +186,64 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d target_type = rows[0][8] target = rows[0][9] version = rows[0][10] - comparision_time = (heartbeat_interval*heartbeat_missed_count)*1000 - if (validity_flag ==1): + comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000 + if (validity_flag == 1): for source_name_key in range(source_name_count): - epoc_time = int(round(time.time()*1000)) - epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1)) + epoc_time = int(round(time.time() * 1000)) + epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (event_name, (source_name_key + 1)) cur.execute(epoc_query) row = cur.fetchall() - if (len(row)==0): + if (len(row) == 0): continue epoc_time_sec = row[0][0] srcName = row[0][1] cl_flag = row[0][2] - if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0): - sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) + if ((epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0): + sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, + target_type, srcName, epoc_time, closed_control_loop_name, version, + target) cl_flag = 1 - update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) + update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1)) cur.execute(update_query) connection_db.commit() - elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1): - sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) + elif ((epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1): + sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, + target_type, srcName, epoc_time, closed_control_loop_name, version, + target) cl_flag = 0 - update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) + update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1)) cur.execute(update_query) connection_db.commit() - else: #pragma: no cover - msg="DBM:DB Monitoring is ignored for %s since validity flag is 0" %(event_name) + else: # pragma: no cover + msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % (event_name) _logger.info(msg) - delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" %(event_name) + delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" % (event_name) cur.execute(delete_query_table2) - delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" %(event_name) + delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" % (event_name) cur.execute(delete_query) connection_db.commit() """ Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2 """ - else: #pragma: no cover - msg="DBM:Inactive instance or hb_common state is not RUNNING" + else: # pragma: no cover + msg = "DBM:Inactive instance or hb_common state is not RUNNING" _logger.info(msg) pm.commit_and_close_db(connection_db) cur.close() break; + if __name__ == "__main__": _logger.info("DBM: DBM Process started") current_pid = sys.argv[1] jsfile = sys.argv[2] ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) - msg="DBM:Parent process ID and json file name",current_pid, jsfile + msg = "DBM:Parent process ID and json file name", current_pid, jsfile _logger.info(msg) while (True): - db_monitoring(current_pid,jsfile,user_name,password,ip_address,port_num,db_name) + db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): - break + break diff --git a/miss_htbt_service/get_logger.py b/miss_htbt_service/get_logger.py index c94e333..bc8fea0 100644 --- a/miss_htbt_service/get_logger.py +++ b/miss_htbt_service/get_logger.py @@ -20,23 +20,22 @@ import logging.handlers '''Configures the module root logger''' root = logging.getLogger() if root.handlers: - #root.handlers.clear() + # root.handlers.clear() del root.handlers[:] formatter = logging.Formatter('%(asctime)s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(levelname)s | %(message)s') handler = logging.StreamHandler() handler.setFormatter(formatter) root.addHandler(handler) -fhandler = logging.handlers.RotatingFileHandler('./hb_logs.txt', maxBytes=(1048576*5), backupCount=10) +fhandler = logging.handlers.RotatingFileHandler('./hb_logs.txt', maxBytes=(1048576 * 5), backupCount=10) fhandler.setFormatter(formatter) root.addHandler(fhandler) root.setLevel("DEBUG") + class BadEnviornmentENVNotFound(Exception): pass + def get_logger(module=None): '''Returns a module-specific logger or global logger if the module is None''' return root if module is None else root.getChild(module) - - - diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index a2ffeca..5fa4074 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -35,90 +35,92 @@ import get_logger _logger = get_logger.get_logger(__name__) + def read_json_file(i, prefix="../../tests"): - if (i==0): - with open (path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile: - cfg = json.load(outfile) + if (i == 0): + with open(path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile: + cfg = json.load(outfile) elif (i == 1): - with open (path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: - cfg = json.load(outfile) - elif (i ==2): - with open( path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile: - cfg = json.load(outfile) + with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: + cfg = json.load(outfile) + elif (i == 2): + with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile: + cfg = json.load(outfile) return cfg + def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): global mr_url - i=0 + i = 0 sleep_duration = 20 - while(True): + while (True): time.sleep(sleep_duration) with open(jsfile, 'r') as outfile: cfg = json.load(outfile) mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) - while(True): - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) - if(hbc_state == "RECONFIGURATION"): + while (True): + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + if (hbc_state == "RECONFIGURATION"): _logger.info("HBT:Waiting for hb_common state to become RUNNING") time.sleep(10) else: break - if(os.getenv('pytest', "") == 'test'): - eventnameList = ["Heartbeat_vDNS","Heartbeat_vFW","Heartbeat_xx"] - connection_db = 0 + if (os.getenv('pytest', "") == 'test'): + eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] + connection_db = 0 else: - connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) - cur = connection_db.cursor() - db_query = "Select event_name from vnf_table_1" - cur.execute(db_query) - eventnameList = [item[0] for item in cur.fetchall()] - msg="\n\nHBT:eventnameList values ", eventnameList + connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) + cur = connection_db.cursor() + db_query = "Select event_name from vnf_table_1" + cur.execute(db_query) + eventnameList = [item[0] for item in cur.fetchall()] + msg = "\n\nHBT:eventnameList values ", eventnameList _logger.info(msg) if "groupID" not in os.environ or "consumerID" not in os.environ: - get_url = mr_url + '/DefaultGroup/1?timeout=15000' + get_url = mr_url + '/DefaultGroup/1?timeout=15000' else: - get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000' - msg="HBT:Getting :"+get_url + get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000' + msg = "HBT:Getting :" + get_url _logger.info(msg) - if(os.getenv('pytest', "") == 'test'): - jsonobj = read_json_file(i) - jobj = [] - jobj.append(jsonobj) - i=i+1 - msg="HBT:newly received test message", jobj - _logger.info(msg) - if (i >= 3): - i=0 - break + if (os.getenv('pytest', "") == 'test'): + jsonobj = read_json_file(i) + jobj = [] + jobj.append(jsonobj) + i = i + 1 + msg = "HBT:newly received test message", jobj + _logger.info(msg) + if (i >= 3): + i = 0 + break else: - res = requests.get(get_url) - msg="HBT:",res.text - _logger.info(msg) - inputString = res.text - #If mrstatus in message body indicates some information, not json msg. - if ("mrstatus" in inputString): - continue - jlist = inputString.split('\n'); - # Process the DMaaP input message retreived - error = False - for line in jlist: - try: - jobj = json.loads(line) - except ValueError: - msg='HBT:Decoding JSON has failed' - _logger.error(msg) - error = True - break - if (error == True): - continue - if len(jobj) == 0: - continue + res = requests.get(get_url) + msg = "HBT:", res.text + _logger.info(msg) + inputString = res.text + # If mrstatus in message body indicates some information, not json msg. + if ("mrstatus" in inputString): + continue + jlist = inputString.split('\n'); + # Process the DMaaP input message retreived + error = False + for line in jlist: + try: + jobj = json.loads(line) + except ValueError: + msg = 'HBT:Decoding JSON has failed' + _logger.error(msg) + error = True + break + if (error == True): + continue + if len(jobj) == 0: + continue for item in jobj: try: - if(os.getenv('pytest', "") == 'test'): + if (os.getenv('pytest', "") == 'test'): jitem = jsonobj else: jitem = json.loads(item) @@ -127,91 +129,95 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): seqnum = (jitem['event']['commonEventHeader']['sequence']) eventName = (jitem['event']['commonEventHeader']['eventName']) except(Exception) as err: - msg = "HBT message process error - ",err + msg = "HBT message process error - ", err _logger.error(msg) continue - msg="HBT:Newly received HB event values ::", eventName,lastepo,srcname + msg = "HBT:Newly received HB event values ::", eventName, lastepo, srcname _logger.info(msg) - if(db_table_creation_check(connection_db,"vnf_table_2") ==False): - msg="HBT:Creating vnf_table_2" - _logger.info(msg) - cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);") + if (db_table_creation_check(connection_db, "vnf_table_2") == False): + msg = "HBT:Creating vnf_table_2" + _logger.info(msg) + cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);") else: - msg="HBT:vnf_table_2 is already there" - _logger.info(msg) - if(eventName in eventnameList): #pragma: no cover - db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" %(eventName) - msg="HBT:",db_query + msg = "HBT:vnf_table_2 is already there" + _logger.info(msg) + if (eventName in eventnameList): # pragma: no cover + db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" % (eventName) + msg = "HBT:", db_query + _logger.info(msg) + if (os.getenv('pytest', "") == 'test'): + break + cur.execute(db_query) + row = cur.fetchone() + source_name_count = row[0] + source_name_key = source_name_count + 1 + cl_flag = 0 + if (source_name_count == 0): # pragma: no cover + msg = "HBT: Insert entry in table_2,source_name_count=0 : ", row _logger.info(msg) - if(os.getenv('pytest', "") == 'test'): - break - cur.execute(db_query) - row = cur.fetchone() - source_name_count = row[0] - source_name_key = source_name_count+1 - cl_flag = 0 - if(source_name_count==0): #pragma: no cover - msg="HBT: Insert entry in table_2,source_name_count=0 : ",row - _logger.info(msg) - query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag) - cur.execute(query_value) - update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName) - cur.execute(update_query) - else: #pragma: no cover - msg="HBT:event name, source_name & source_name_count are",eventName, srcname, source_name_count - _logger.info(msg) - for source_name_key in range(source_name_count): - epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(eventName,(source_name_key+1)) - msg="HBT:eppc query is",epoc_query - _logger.info(msg) - cur.execute(epoc_query) - row = cur.fetchall() - if (len(row)==0): - continue - db_srcname = row[0][0] - if (db_srcname == srcname): - msg="HBT: Update vnf_table_2 : ",source_name_key, row - _logger.info(msg) - update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" %(lastepo,srcname,eventName,(source_name_key+1)) - cur.execute(update_query) - source_name_key = source_name_count - break - else: - continue - msg="HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count + query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" % ( + eventName, source_name_key, lastepo, srcname, cl_flag) + cur.execute(query_value) + update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" % ( + source_name_key, eventName) + cur.execute(update_query) + else: # pragma: no cover + msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count + _logger.info(msg) + for source_name_key in range(source_name_count): + epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (eventName, (source_name_key + 1)) + msg = "HBT:eppc query is", epoc_query _logger.info(msg) - if (source_name_count == (source_name_key+1)): - source_name_key = source_name_count+1 - msg="HBT: Insert entry in table_2 : ",row + cur.execute(epoc_query) + row = cur.fetchall() + if (len(row) == 0): + continue + db_srcname = row[0][0] + if (db_srcname == srcname): + msg = "HBT: Update vnf_table_2 : ", source_name_key, row _logger.info(msg) - insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag) - cur.execute(insert_query) - update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName) + update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" % (lastepo, srcname, eventName, (source_name_key + 1)) cur.execute(update_query) + source_name_key = source_name_count + break + else: + continue + msg = "HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count + _logger.info(msg) + if (source_name_count == (source_name_key + 1)): + source_name_key = source_name_count + 1 + msg = "HBT: Insert entry in table_2 : ", row + _logger.info(msg) + insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" % ( + eventName, source_name_key, lastepo, srcname, cl_flag) + cur.execute(insert_query) + update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" % (source_name_key, eventName) + cur.execute(update_query) else: - _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") + _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") commit_db(connection_db) commit_and_close_db(connection_db) - if(os.getenv('pytest', "") != 'test'): - cur.close() + if (os.getenv('pytest', "") != 'test'): + cur.close() -def postgres_db_open(username,password,host,port,database_name): - if(os.getenv('pytest', "") == 'test'): +def postgres_db_open(username, password, host, port, database_name): + if (os.getenv('pytest', "") == 'test'): return True - connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port) + connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port) return connection -def db_table_creation_check(connection_db,table_name): - if(os.getenv('pytest', "") == 'test'): + +def db_table_creation_check(connection_db, table_name): + if (os.getenv('pytest', "") == 'test'): return True try: cur = connection_db.cursor() - query_db = "select * from information_schema.tables where table_name='%s'" %(table_name) + query_db = "select * from information_schema.tables where table_name='%s'" % (table_name) cur.execute(query_db) database_names = cur.fetchone() - if(database_names is not None): - if(table_name in database_names): + if (database_names is not None): + if (table_name in database_names): return True else: return False @@ -223,22 +229,24 @@ def db_table_creation_check(connection_db,table_name): finally: cur.close() + def commit_db(connection_db): - if(os.getenv('pytest', "") == 'test'): + if (os.getenv('pytest', "") == 'test'): return True try: - connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.commit() # <--- makes sure the change is shown in the database return True except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = 'COMMON:Error %s' % e _logger.error(msg) return False + def commit_and_close_db(connection_db): - if(os.getenv('pytest', "") == 'test'): + if (os.getenv('pytest', "") == 'test'): return True try: - connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() return True except psycopg2.DatabaseError as e: @@ -246,11 +254,12 @@ def commit_and_close_db(connection_db): _logger.error(msg) return False + if __name__ == '__main__': jsfile = sys.argv[1] - msg="HBT:HeartBeat thread Created" + msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") - msg="HBT:The config file name passed is -%s", jsfile + msg = "HBT:The config file name passed is -%s", jsfile _logger.info(msg) - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties(jsfile) - process_msg(jsfile,user_name, password, ip_address, port_num, db_name) + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + process_msg(jsfile, user_name, password, ip_address, port_num, db_name) diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 868020c..12d439a 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -44,21 +44,22 @@ import get_logger from mod import trapd_settings as tds from mod.trapd_get_cbs_config import get_cbs_config -hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) +hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) ABSOLUTE_PATH1 = path.abspath(path.join(__file__, "../htbtworker.py")) ABSOLUTE_PATH2 = path.abspath(path.join(__file__, "../db_monitoring.py")) ABSOLUTE_PATH3 = path.abspath(path.join(__file__, "../check_health.py")) ABSOLUTE_PATH4 = path.abspath(path.join(__file__, "../cbs_polling.py")) + def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name): from psycopg2 import connect try: - con = connect(user=user_name, host = ip_address, password = password) + con = connect(user=user_name, host=ip_address, password=password) database_name = db_name con.autocommit = True cur = con.cursor() - query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" %(database_name) + query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" % (database_name) cur.execute(query_value) not_exists_row = cur.fetchone() msg = "MSHBT:Create_database:DB not exists? ", not_exists_row @@ -66,7 +67,7 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password not_exists = not_exists_row[0] if not_exists is True: _logger.info("MSHBT:Creating database ...") - query_value = "CREATE DATABASE %s" %(database_name) + query_value = "CREATE DATABASE %s" % (database_name) cur.execute(query_value) else: _logger.info("MSHBD:Database already exists") @@ -81,16 +82,17 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password cur.close() con.close() except(Exception) as err: - msg = "MSHBD:DB Creation -",err + msg = "MSHBD:DB Creation -", err _logger.error(msg) -#def get_pol_and_mr_urls(jsfile, pol_url, mr_url): + +# def get_pol_and_mr_urls(jsfile, pol_url, mr_url): # with open(jsfile, 'r') as outfile: # cfg = json.load(outfile) # mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) # pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) -def read_hb_common(user_name,password,ip_address,port_num,db_name): +def read_hb_common(user_name, password, ip_address, port_num, db_name): envPytest = os.getenv('pytest', "") if (envPytest == 'test'): hbc_pid = 10 @@ -98,7 +100,7 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name): hbc_time = 1584595881 hbc_state = "RUNNING" return hbc_pid, hbc_state, hbc_srcName, hbc_time - connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;" cur.execute(query_value) @@ -111,27 +113,29 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name): cur.close() return hbc_pid, hbc_state, hbc_srcName, hbc_time -def create_update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name): + +def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name): current_time = int(round(time.time())) source_name = socket.gethostname() source_name = source_name + "-" + os.getenv('SERVICE_NAME', "") envPytest = os.getenv('pytest', "") if (envPytest != 'test'): - connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) - cur = connection_db.cursor() - if(heartbeat.db_table_creation_check(connection_db,"hb_common") ==False): - cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);") - query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" %(process_id, source_name, current_time, state) - _logger.info("MSHBT:Created hb_common DB and updated new values") - cur.execute(query_value) - if(update_flg == 1): - query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state) - _logger.info("MSHBT:Updated hb_common DB with new values") - cur.execute(query_value) - heartbeat.commit_and_close_db(connection_db) - cur.close() - -def create_update_vnf_table_1(jsfile,update_db,connection_db): + connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) + cur = connection_db.cursor() + if (heartbeat.db_table_creation_check(connection_db, "hb_common") == False): + cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);") + query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" % (process_id, source_name, current_time, state) + _logger.info("MSHBT:Created hb_common DB and updated new values") + cur.execute(query_value) + if (update_flg == 1): + query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" % (process_id, source_name, current_time, state) + _logger.info("MSHBT:Updated hb_common DB with new values") + cur.execute(query_value) + heartbeat.commit_and_close_db(connection_db) + cur.close() + + +def create_update_vnf_table_1(jsfile, update_db, connection_db): with open(jsfile, 'r') as outfile: cfg = json.load(outfile) hbcfg = cfg['heartbeat_config'] @@ -141,10 +145,10 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db): vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] else: cur = connection_db.cursor() - if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False): + if (heartbeat.db_table_creation_check(connection_db, "vnf_table_1") == False): cur.execute("CREATE TABLE vnf_table_1 (EVENT_NAME varchar primary key,HEARTBEAT_MISSED_COUNT integer,HEARTBEAT_INTERVAL integer,CLOSED_CONTROL_LOOP_NAME varchar,POLICY_VERSION varchar,POLICY_NAME varchar,POLICY_SCOPE varchar,TARGET_TYPE varchar,TARGET varchar, VERSION varchar,SOURCE_NAME_COUNT integer,VALIDITY_FLAG integer);") _logger.info("MSHBT:Created vnf_table_1 table") - if(update_db == 1): + if (update_db == 1): query_value = "UPDATE vnf_table_1 SET VALIDITY_FLAG=0 where VALIDITY_FLAG=1;" cur.execute(query_value) _logger.info("MSHBT:Set Validity flag to zero in vnf_table_1 table") @@ -154,7 +158,7 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db): vnf_list = [item[0] for item in cur.fetchall()] for vnf in (jhbcfg['vnfs']): nfc = vnf['eventName'] - #_logger.error("MSHBT:",nfc) + # _logger.error("MSHBT:",nfc) validity_flag = 1 source_name_count = 0 missed = vnf['heartbeatcountmissed'] @@ -167,62 +171,69 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db): target = vnf['target'] version = vnf['version'] - if(nfc not in vnf_list): - query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" %(nfc,missed,intvl,clloop,policyVersion,policyName,policyScope,target_type, target,version,source_name_count,validity_flag) + if (nfc not in vnf_list): + query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" % (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, source_name_count, validity_flag) else: - query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" %(missed,intvl,clloop,policyVersion,policyName,policyScope,target_type,target,version,validity_flag,nfc) + query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" % (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, validity_flag, nfc) if (envPytest != 'test'): cur.execute(query_value) - #heartbeat.commit_and_close_db(connection_db) + # heartbeat.commit_and_close_db(connection_db) if (envPytest != 'test'): cur.close() _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file") + def hb_cbs_polling_process(pid_current): - subprocess.call([ABSOLUTE_PATH4 , str(pid_current) ]) - sys.stdout.flush() - _logger.info("MSHBT:Creaated CBS polling process") - return + subprocess.call([ABSOLUTE_PATH4, str(pid_current)]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated CBS polling process") + return + + def hb_worker_process(config_file_path): - subprocess.call([ABSOLUTE_PATH1 , config_file_path ]) - sys.stdout.flush() - _logger.info("MSHBT:Creaated Heartbeat worker process") - return - -def db_monitoring_process(current_pid,jsfile): - subprocess.call([ABSOLUTE_PATH2 , str(current_pid),jsfile]) - sys.stdout.flush() - _logger.info("MSHBT:Creaated DB Monitoring process") - return + subprocess.call([ABSOLUTE_PATH1, config_file_path]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated Heartbeat worker process") + return + + +def db_monitoring_process(current_pid, jsfile): + subprocess.call([ABSOLUTE_PATH2, str(current_pid), jsfile]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated DB Monitoring process") + return + + def read_hb_properties_default(): - #Read the hbproperties.yaml for postgress and CBS related data - s=open(hb_properties_file, 'r') - a=yaml.full_load(s) - - if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): - ip_address = a['pg_ipAddress'] - port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] - else: - ip_address = os.getenv('pg_ipAddress') - port_num = os.getenv('pg_portNum') - user_name = os.getenv('pg_userName') - password = os.getenv('pg_passwd') + # Read the hbproperties.yaml for postgress and CBS related data + s = open(hb_properties_file, 'r') + a = yaml.full_load(s) + + if ((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): + ip_address = a['pg_ipAddress'] + port_num = a['pg_portNum'] + user_name = a['pg_userName'] + password = a['pg_passwd'] + else: + ip_address = os.getenv('pg_ipAddress') + port_num = os.getenv('pg_portNum') + user_name = os.getenv('pg_userName') + password = os.getenv('pg_passwd') + + dbName = a['pg_dbName'] + db_name = dbName.lower() + cbs_polling_required = a['CBS_polling_allowed'] + cbs_polling_interval = a['CBS_polling_interval'] + s.close() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval - dbName = a['pg_dbName'] - db_name = dbName.lower() - cbs_polling_required = a['CBS_polling_allowed'] - cbs_polling_interval = a['CBS_polling_interval'] - s.close() - return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval def read_hb_properties(jsfile): try: with open(jsfile, 'r') as outfile: - cfg = json.load(outfile) + cfg = json.load(outfile) except(Exception) as err: - msg = "CBS Json file load error - ",err + msg = "CBS Json file load error - ", err _logger.error(msg) return read_hb_properties_default() @@ -235,21 +246,22 @@ def read_hb_properties(jsfile): db_name = dbName.lower() cbs_polling_required = str(cfg['CBS_polling_allowed']) cbs_polling_interval = str(cfg['CBS_polling_interval']) - consumer_id = str(cfg['consumerID']) - group_id = str(cfg['groupID']) + consumer_id = str(cfg['consumerID']) + group_id = str(cfg['groupID']) os.environ['consumerID'] = consumer_id os.environ['groupID'] = group_id if "SERVICE_NAME" in cfg: - os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) + os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) except(Exception) as err: - msg = "CBS Json file read parameter error - ",err + msg = "CBS Json file read parameter error - ", err _logger.error(msg) return read_hb_properties_default() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + def fetch_json_file(): if get_cbs_config(): - #current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" + # current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" envPytest = os.getenv('pytest', "") if (envPytest == 'test'): current_runtime_config_file_name = "/tmp/opt/app/miss_htbt_service/etc/config.json" @@ -260,11 +272,11 @@ def fetch_json_file(): with open(current_runtime_config_file_name, 'w') as outfile: json.dump(tds.c_config, outfile) if os.getenv('pytest', "") == 'test': - jsfile = current_runtime_config_file_name + jsfile = current_runtime_config_file_name else: - jsfile = "../etc/config.json" - os.system('cp ../etc/download.json ../etc/config.json') - os.remove("../etc/download.json") + jsfile = "../etc/config.json" + os.system('cp ../etc/download.json ../etc/config.json') + os.remove("../etc/download.json") else: msg = "MSHBD:CBS Config not available, using local config" _logger.warning(msg) @@ -277,39 +289,43 @@ def fetch_json_file(): _logger.info(msg) return jsfile + def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name): envPytest = os.getenv('pytest', "") - if (envPytest != 'test'): #pragma: no cover - if(update_db == 0): - create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + if (envPytest != 'test'): # pragma: no cover + if (update_db == 0): + create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name _logger.info(msg) - connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - if(update_db == 0): - if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False): - create_update_vnf_table_1(jsfile,update_db,connection_db) + if (update_db == 0): + if (heartbeat.db_table_creation_check(connection_db, "vnf_table_1") == False): + create_update_vnf_table_1(jsfile, update_db, connection_db) else: - create_update_vnf_table_1(jsfile,update_db,connection_db) + create_update_vnf_table_1(jsfile, update_db, connection_db) heartbeat.commit_and_close_db(connection_db) cur.close() + def create_process(job_list, jsfile, pid_current): - if(len(job_list) == 0): - p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) + if (len(job_list) == 0): + p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) time.sleep(1) - p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current,jsfile,)) + p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,)) p1.start() time.sleep(1) p2.start() job_list.append(p1) job_list.append(p2) - msg = "MSHBD:jobs list is",job_list + msg = "MSHBD:jobs list is", job_list _logger.info(msg) return job_list + _logger = get_logger.get_logger(__name__) + def main(): try: subprocess.Popen([ABSOLUTE_PATH3], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) @@ -320,109 +336,110 @@ def main(): ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile) msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval _logger.info(msg) - if(cbs_polling_required == 'True'): - p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,)) - p3.start() + if (cbs_polling_required == 'True'): + p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,)) + p3.start() update_db = 0 create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) state = "RECONFIGURATION" update_flg = 0 - create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) - msg = "MSHBD:Current process id is",pid_current + create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + msg = "MSHBD:Current process id is", pid_current _logger.info(msg) _logger.info("MSHBD:Now be in a continuous loop") - i=0 - while(True): - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) - msg = "MSHBT: hb_common values ",hbc_pid, hbc_state, hbc_srcName, hbc_time - _logger.info(msg) - current_time = int(round(time.time())) - time_difference = current_time - hbc_time - msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is",hbc_pid,hbc_srcName,hbc_state,hbc_time,current_time,time_difference - _logger.info(msg) - source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) - envPytest = os.getenv('pytest', "") - if (envPytest == 'test'): - if i == 2: - hbc_pid = pid_current - source_name = hbc_srcName - hbc_state = "RECONFIGURATION" - elif (i>3): - hbc_pid = pid_current - source_name = hbc_srcName - hbc_state = "RUNNING" - if (time_difference <60): - if((int(hbc_pid)==int(pid_current)) and (source_name==hbc_srcName)): - msg = "MSHBD:config status is",hbc_state + i = 0 + while (True): + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time + _logger.info(msg) + current_time = int(round(time.time())) + time_difference = current_time - hbc_time + msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference + _logger.info(msg) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + if i == 2: + hbc_pid = pid_current + source_name = hbc_srcName + hbc_state = "RECONFIGURATION" + elif (i > 3): + hbc_pid = pid_current + source_name = hbc_srcName + hbc_state = "RUNNING" + if (time_difference < 60): + if ((int(hbc_pid) == int(pid_current)) and (source_name == hbc_srcName)): + msg = "MSHBD:config status is", hbc_state + _logger.info(msg) + if (hbc_state == "RUNNING"): + state = "RUNNING" + update_flg = 1 + create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + elif (hbc_state == "RECONFIGURATION"): + _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") + jsfile = fetch_json_file() + update_db = 1 + create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + msg = "MSHBD: parameters passed to DBM and HB are %d and %s", pid_current _logger.info(msg) - if(hbc_state=="RUNNING"): - state = "RUNNING" - update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) - elif(hbc_state=="RECONFIGURATION"): - _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") - jsfile = fetch_json_file() - update_db = 1 - create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) - msg = "MSHBD: parameters passed to DBM and HB are %d and %s",pid_current - _logger.info(msg) - job_list = create_process(job_list, jsfile, pid_current) - state = "RUNNING" - update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) + job_list = create_process(job_list, jsfile, pid_current) + state = "RUNNING" + update_flg = 1 + create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) - else: - _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") - if(len(job_list)>=2): - _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") - else: - jsfile = fetch_json_file() - msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",jsfile,pid_current - _logger.info(msg) - job_list = create_process(job_list, jsfile, pid_current) else: - _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover") - if((int(hbc_pid)!=int(pid_current))or (source_name!=hbc_srcName)): - _logger.info("MSHBD:Initiating to become Active Instance") - if(len(job_list)>=2): - _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE") - else: - jsfile = fetch_json_file() - msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s",jsfile,pid_current - _logger.info(msg) - job_list = create_process(job_list, jsfile, pid_current) - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) - update_flg = 1 - create_update_hb_common(update_flg, pid_current, hbc_state, user_name,password,ip_address,port_num,db_name) + _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") + if (len(job_list) >= 2): + _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") + else: + jsfile = fetch_json_file() + msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + else: + _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover") + if ((int(hbc_pid) != int(pid_current)) or (source_name != hbc_srcName)): + _logger.info("MSHBD:Initiating to become Active Instance") + if (len(job_list) >= 2): + _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE") else: - _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") - time.sleep(25) - if os.getenv('pytest', "") == 'test': - i = i + 1 - if(i > 5): - _logger.info("Terminating main process for pytest") - p3.terminate() + jsfile = fetch_json_file() + msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + update_flg = 1 + create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name) + else: + _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") + time.sleep(25) + if os.getenv('pytest', "") == 'test': + i = i + 1 + if (i > 5): + _logger.info("Terminating main process for pytest") + p3.terminate() + time.sleep(1) + p3.join() + if (len(job_list) > 0): + job_list[0].terminate() time.sleep(1) - p3.join() - if(len(job_list)>0): - job_list[0].terminate() - time.sleep(1) - job_list[0].join() - job_list.remove(job_list[0]) - if(len(job_list)>0): - job_list[0].terminate() - time.sleep(1) - job_list[0].join() - job_list.remove(job_list[0]) - break + job_list[0].join() + job_list.remove(job_list[0]) + if (len(job_list) > 0): + job_list[0].terminate() + time.sleep(1) + job_list[0].join() + job_list.remove(job_list[0]) + break except (Exception) as e: - msg = "MSHBD:Exception as %s" %(str(traceback.format_exc())) + msg = "MSHBD:Exception as %s" % (str(traceback.format_exc())) _logger.error(msg) msg = "Fatal error. Could not start missing heartbeat service due to: {0}".format(e) _logger.error(msg) + if __name__ == '__main__': main() |