From 235db9ca1ded4f36284108e8f95c6abe64bcd462 Mon Sep 17 00:00:00 2001 From: PrakashH Date: Thu, 31 Jan 2019 19:34:38 +0000 Subject: Heartbeat Microservice CL-OUT support Support the control loop having heartbeat event Issue-ID: DCAEGEN2-1138 Change-Id: I14f4d6bb723794450fbaf3c86f063148364bc023 Signed-off-by: PrakashH --- miss_htbt_service/db_monitoring.py | 72 ++++++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 11 deletions(-) (limited to 'miss_htbt_service/db_monitoring.py') diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index b435f2a..4eb33ff 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -34,6 +34,37 @@ import get_logger _logger = get_logger.get_logger(__name__) +def sendHBControlLoopEventV2(pol_url, closed_control_loop_name, closedLoopFlag, policy_name, policy_type, last_recvd_hb_event): + msg="DBM:Time to send Control Loop Event, CL name, CL flag, policy name/type - ",closed_control_loop_name, closedLoopFlag, policy_name, policy_type + _logger.info(msg) + try: + jpayload = json.loads(last_recvd_hb_event) + jpayload['event']['commonEventHeader']['internalHeaderFields'] = {} + jpayload['event']['commonEventHeader']['internalHeaderFields']['closedLoopControlName'] = closed_control_loop_name + jpayload['event']['commonEventHeader']['internalHeaderFields']['closedLoopFlag'] = closedLoopFlag + jpayload['event']['commonEventHeader']['internalHeaderFields']['policyName'] = policy_name + jpayload['event']['commonEventHeader']['internalHeaderFields']['policyType'] = policy_type + jpayload['event']['commonEventHeader']['internalHeaderFields']['internalHeaderFieldsVersion'] = '2.0' + msg="DBM: HB1 Json object is", jpayload + _logger.info(msg) + payload = json.dumps(jpayload) + msg="DBM: HB2 Json object is", payload + _logger.info(msg) + psend_url = pol_url + msg="DBM:",psend_url + _logger.info(msg) + r = requests.post(psend_url, data=payload) + msg="DBM:",r.status_code, r.reason + _logger.info(msg) + ret = r.status_code + msg="DBM:Status code for sending the control loop event is",ret + _logger.info(msg) + except(Exception) as err: + msg = "HBT message process error - ",err + _logger.error(msg) + return True + + def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target): msg="DBM:Time to raise Control Loop Event for Control loop typ /target type - ",CLType, target_type _logger.info(msg) @@ -147,6 +178,7 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d except(Exception) as err: msg='Json file process error : ', err _logger.error(msg) + continue hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) source_name = socket.gethostname() @@ -166,8 +198,10 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d if( hbc_state == "RECONFIGURATION"): _logger.info("DBM:Waiting for hb_common state to become RUNNING") break - - db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" %(event_name) + if(os.getenv('cl_out_hb', "") == 'yes'): + db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_name,policy_type,closed_loop_flag from vnf_table_1 where event_name= '%s'" %(event_name) + else: + db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_name, policy_version,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" %(event_name) cur.execute(db_query) rows = cur.fetchall() validity_flag = rows[0][0] @@ -175,17 +209,25 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d heartbeat_interval = rows[0][2] heartbeat_missed_count = rows[0][3] closed_control_loop_name = rows[0][4] - policy_version = rows[0][5] - policy_name = rows[0][6] - policy_scope = rows[0][7] - target_type = rows[0][8] - target = rows[0][9] - version = rows[0][10] + policy_name = rows[0][5] + + if(os.getenv('cl_out_hb', "") == 'yes'): + policy_type = rows[0][6] + closed_loop_flag = rows[0][7] + else: + policy_version = rows[0][6] + policy_scope = rows[0][7] + target_type = rows[0][8] + target = rows[0][9] + version = rows[0][10] comparision_time = (heartbeat_interval*heartbeat_missed_count)*1000 if (validity_flag ==1): for source_name_key in range(source_name_count): epoc_time = int(round(time.time()*1000)) - epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1)) + if(os.getenv('cl_out_hb', "") == 'yes'): + epoc_query = "Select last_epo_time,source_name,cl_flag,last_recvd_hb_event from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1)) + else: + epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1)) cur.execute(epoc_query) row = cur.fetchall() if (len(row)==0): @@ -193,14 +235,22 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d epoc_time_sec = row[0][0] srcName = row[0][1] cl_flag = row[0][2] + if(os.getenv('cl_out_hb', "") == 'yes'): + last_recvd_hb_event = row[0][3] if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0): - sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) + if(os.getenv('cl_out_hb', "") == 'yes'): + sendHBControlLoopEventV2(pol_url, closed_control_loop_name, closed_loop_flag, policy_name, policy_type, last_recvd_hb_event) + else: + sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) cl_flag = 1 update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) cur.execute(update_query) connection_db.commit() elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1): - sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) + if(os.getenv('cl_out_hb', "") == 'yes'): + sendHBControlLoopEventV2(pol_url, closed_control_loop_name, closed_loop_flag, policy_name, policy_type, last_recvd_hb_event) + else: + sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) cl_flag = 0 update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) cur.execute(update_query) -- cgit 1.2.3-korg