From a86243058c2daa560aebaecdb096ff63788a6f44 Mon Sep 17 00:00:00 2001 From: PrakashH Date: Mon, 28 Jan 2019 20:49:02 +0000 Subject: Heartbeat Microservice Support Heartbeat service monitors missing HB notification Issue-ID: DCAEGEN2-267 Change-Id: I0fd191b2a3495202e22f633ada4a1350a97557ad Signed-off-by: PrakashH --- miss_htbt_service/db_monitoring.py | 235 +++++++++++++++++++------------------ 1 file changed, 118 insertions(+), 117 deletions(-) (limited to 'miss_htbt_service/db_monitoring.py') diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index 6113be2..b435f2a 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -34,19 +34,123 @@ import get_logger _logger = get_logger.get_logger(__name__) +def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target): + msg="DBM:Time to raise Control Loop Event for Control loop typ /target type - ",CLType, target_type + _logger.info(msg) + if(CLType == "ONSET"): + _logger.info("DBM:Heartbeat not received, raising alarm event") + if(target_type == "VNF"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "generic-vnf.vnf-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + elif(target_type == "VM"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "vserver.vserver-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + else: + return True + elif(CLType == "ABATED"): + _logger.info("DBM:Heartbeat received, clearing alarm event") + #last_date_time = datetime.datetime.now() + if(target_type == "VNF"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "generic-vnf.vnf-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + elif(target_type == "VM"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "vserver.vserver-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + else: + return True + else: + return True + payload = json_object + msg="DBM: CL Json object is", json_object + _logger.info(msg) + #psend_url = pol_url+'DefaultGroup/1?timeout=15000' + psend_url = pol_url + msg="DBM:",psend_url + _logger.info(msg) + #Send response for policy on output topic + try: + r = requests.post(psend_url, data=payload) + msg="DBM:",r.status_code, r.reason + _logger.info(msg) + ret = r.status_code + msg="DBM:Status code for sending the control loop event is",ret + _logger.info(msg) + except(Exception) as err: + msg='Message send failure : ', err + _logger.error(msg) + return True + def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,db_name): while(True): time.sleep(20) - with open(json_file, 'r') as outfile: - cfg = json.load(outfile) - pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url']) - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) - source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): break + + try: + with open(json_file, 'r') as outfile: + cfg = json.load(outfile) + pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) + except(Exception) as err: + msg='Json file process error : ', err + _logger.error(msg) + + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) cur = connection_db.cursor() if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"): @@ -89,119 +193,14 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d epoc_time_sec = row[0][0] srcName = row[0][1] cl_flag = row[0][2] - vnfName = event_name if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0): - msg="DBM:Time to raise Control Loop Event for target type - ", target_type - _logger.info(msg) - if(target_type == "VNF"): - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "generic-vnf.vnf-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - elif(target_type == "VM"): - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "vserver.vserver-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - else: - continue - payload = json_object - msg="DBM: CL Json object is", json_object - _logger.info(msg) - #psend_url = pol_url+'DefaultGroup/1?timeout=15000' - psend_url = pol_url - msg="DBM:",psend_url - _logger.info(msg) - msg="DBM:DB monitoring raising alarm event "+psend_url - _logger.info(msg) - #Send response for policy on output topic - r = requests.post(psend_url, data=payload) - msg="DBM:",r.status_code, r.reason - _logger.info(msg) - ret = r.status_code - msg="DBM:Status code after raising the control loop event is",ret - _logger.info(msg) + sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) cl_flag = 1 update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) cur.execute(update_query) connection_db.commit() elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1): - msg="DBM:Time to clear Control Loop Event for target type - ", target_type - _logger.info(msg) - epoc_time = int(round(time.time())) - #last_date_time = datetime.datetime.now() - if(target_type == "VNF"): - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "generic-vnf.vnf-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ABATED", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - elif(target_type == "VM"): - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "vserver.vserver-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ABATED", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - else: - continue - payload = json_object - msg="DBM: CL Json object is", json_object - _logger.info(msg) - #psend_url = pol_url+'DefaultGroup/1?timeout=15000' - psend_url = pol_url - msg="DBM:",psend_url - _logger.info(msg) - msg="DBM:Heartbeat Dead raising alarm event "+psend_url - _logger.info(msg) - #Send response for policy on output topic - r = requests.post(psend_url, data=payload) - msg="DBM:",r.status_code, r.reason - _logger.info(msg) - ret = r.status_code - msg="DBM:Status code after raising the control loop event is",ret - _logger.info(msg) + sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) cl_flag = 0 update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) cur.execute(update_query) @@ -211,9 +210,8 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d msg="DBM:DB Monitoring is ignored for %s since validity flag is 0" %(event_name) _logger.info(msg) - for source_name_key in range(source_name_count): - delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s' and source_name_key=%d;" %(event_name,source_name_key) - cur.execute(delete_query_table2) + delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" %(event_name) + cur.execute(delete_query_table2) delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" %(event_name) cur.execute(delete_query) connection_db.commit() @@ -229,10 +227,13 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d if __name__ == "__main__": _logger.info("DBM: DBM Process started") - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties() current_pid = sys.argv[1] jsfile = sys.argv[2] + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) msg="DBM:Parent process ID and json file name",current_pid, jsfile _logger.info(msg) while (True): db_monitoring(current_pid,jsfile,user_name,password,ip_address,port_num,db_name) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + break -- cgit 1.2.3-korg