diff options
Diffstat (limited to 'miss_htbt_service/db_monitoring.py')
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 172 |
1 files changed, 90 insertions, 82 deletions
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index df1bae7..708a2bd 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -35,56 +35,58 @@ import get_logger _logger = get_logger.get_logger(__name__) -def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target): - msg="DBM:Time to raise Control Loop Event for Control loop typ /target type - ",CLType, target_type + +def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, + closed_control_loop_name, version, target): + msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type _logger.info(msg) - if(CLType == "ONSET"): + if (CLType == "ONSET"): _logger.info("DBM:Heartbeat not received, raising alarm event") - if(target_type == "VNF"): + if (target_type == "VNF"): json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "generic-vnf.vnf-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - elif(target_type == "VM"): + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"generic-vnf.vnf-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + elif (target_type == "VM"): json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "vserver.vserver-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"vserver.vserver-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); else: return True - elif(CLType == "ABATED"): + elif (CLType == "ABATED"): _logger.info("DBM:Heartbeat received, clearing alarm event") - #last_date_time = datetime.datetime.now() - if(target_type == "VNF"): + # last_date_time = datetime.datetime.now() + if (target_type == "VNF"): json_object = json.dumps({ "closedLoopEventClient": "DCAE_Heartbeat_MS", "policyVersion": policy_version, "policyName": policy_name, "policyScope": policy_scope, "target_type": target_type, - "AAI": { "generic-vnf.vnf-name": srcName} , + "AAI": {"generic-vnf.vnf-name": srcName}, "closedLoopAlarmStart": epoc_time, "closedLoopEventStatus": "ABATED", "closedLoopControlName": closed_control_loop_name, @@ -92,15 +94,15 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_s "target": target, "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", "from": "DCAE" - }); - elif(target_type == "VM"): + }); + elif (target_type == "VM"): json_object = json.dumps({ "closedLoopEventClient": "DCAE_Heartbeat_MS", "policyVersion": policy_version, "policyName": policy_name, "policyScope": policy_scope, "target_type": target_type, - "AAI": { "vserver.vserver-name": srcName} , + "AAI": {"vserver.vserver-name": srcName}, "closedLoopAlarmStart": epoc_time, "closedLoopEventStatus": "ABATED", "closedLoopControlName": closed_control_loop_name, @@ -108,54 +110,55 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_s "target": target, "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", "from": "DCAE" - }); + }); else: return True else: return True payload = json_object - msg="DBM: CL Json object is", json_object + msg = "DBM: CL Json object is", json_object _logger.info(msg) - #psend_url = pol_url+'DefaultGroup/1?timeout=15000' + # psend_url = pol_url+'DefaultGroup/1?timeout=15000' psend_url = pol_url - msg="DBM:",psend_url + msg = "DBM:", psend_url _logger.info(msg) - #Send response for policy on output topic + # Send response for policy on output topic try: r = requests.post(psend_url, data=payload) - msg="DBM:",r.status_code, r.reason + msg = "DBM:", r.status_code, r.reason _logger.info(msg) ret = r.status_code - msg="DBM:Status code for sending the control loop event is",ret + msg = "DBM:Status code for sending the control loop event is", ret _logger.info(msg) except(Exception) as err: - msg='Message send failure : ', err + msg = 'Message send failure : ', err _logger.error(msg) return True -def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,db_name): - while(True): + +def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name): + while (True): time.sleep(20) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): - break + break try: - with open(json_file, 'r') as outfile: - cfg = json.load(outfile) - pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) + with open(json_file, 'r') as outfile: + cfg = json.load(outfile) + pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) except(Exception) as err: - msg='Json file process error : ', err - _logger.error(msg) - continue + msg = 'Json file process error : ', err + _logger.error(msg) + continue - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) source_name = socket.gethostname() source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) - connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"): + if (int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING"): _logger.info("DBM: Active DB Monitoring Instance") db_query = "Select event_name from vnf_table_1" cur.execute(db_query) @@ -165,11 +168,11 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d cur.execute(query_value) rows = cur.fetchall() hbc_state = rows[0][0] - if( hbc_state == "RECONFIGURATION"): + if (hbc_state == "RECONFIGURATION"): _logger.info("DBM:Waiting for hb_common state to become RUNNING") break - db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" %(event_name) + db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" % (event_name) cur.execute(db_query) rows = cur.fetchall() validity_flag = rows[0][0] @@ -183,59 +186,64 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d target_type = rows[0][8] target = rows[0][9] version = rows[0][10] - comparision_time = (heartbeat_interval*heartbeat_missed_count)*1000 - if (validity_flag ==1): + comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000 + if (validity_flag == 1): for source_name_key in range(source_name_count): - epoc_time = int(round(time.time()*1000)) - epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1)) + epoc_time = int(round(time.time() * 1000)) + epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (event_name, (source_name_key + 1)) cur.execute(epoc_query) row = cur.fetchall() - if (len(row)==0): + if (len(row) == 0): continue epoc_time_sec = row[0][0] srcName = row[0][1] cl_flag = row[0][2] - if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0): - sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) + if ((epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0): + sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, + target_type, srcName, epoc_time, closed_control_loop_name, version, + target) cl_flag = 1 - update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) + update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1)) cur.execute(update_query) connection_db.commit() - elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1): - sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) + elif ((epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1): + sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, + target_type, srcName, epoc_time, closed_control_loop_name, version, + target) cl_flag = 0 - update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) + update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1)) cur.execute(update_query) connection_db.commit() - else: #pragma: no cover - msg="DBM:DB Monitoring is ignored for %s since validity flag is 0" %(event_name) + else: # pragma: no cover + msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % (event_name) _logger.info(msg) - delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" %(event_name) + delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" % (event_name) cur.execute(delete_query_table2) - delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" %(event_name) + delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" % (event_name) cur.execute(delete_query) connection_db.commit() """ Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2 """ - else: #pragma: no cover - msg="DBM:Inactive instance or hb_common state is not RUNNING" + else: # pragma: no cover + msg = "DBM:Inactive instance or hb_common state is not RUNNING" _logger.info(msg) pm.commit_and_close_db(connection_db) cur.close() break; + if __name__ == "__main__": _logger.info("DBM: DBM Process started") current_pid = sys.argv[1] jsfile = sys.argv[2] ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) - msg="DBM:Parent process ID and json file name",current_pid, jsfile + msg = "DBM:Parent process ID and json file name", current_pid, jsfile _logger.info(msg) while (True): - db_monitoring(current_pid,jsfile,user_name,password,ip_address,port_num,db_name) + db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): - break + break |