diff options
Diffstat (limited to 'miss_htbt_service')
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 237 |
1 files changed, 121 insertions, 116 deletions
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index 0d27a95..472c356 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -156,9 +156,10 @@ def sendControlLoopEvent( return True -def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name): + +def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name, sleeptime=20): while True: - time.sleep(20) + time.sleep(sleeptime) try: with open(json_file, "r") as outfile: @@ -168,125 +169,129 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ msg = "Json file process error : ", err _logger.error(msg) continue + db_monitoring_singlepass (current_pid, pol_url, user_name, password, ip_address, port_num, db_name) + break + +def db_monitoring_singlepass (current_pid, pol_url, user_name, password, ip_address, port_num, db_name) : + hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) + + connection_db = pm.postgres_db_open() + cur = connection_db.cursor() + if ( + int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING" + ): + _logger.info("DBM: Active DB Monitoring Instance") + cur.execute("SELECT event_name FROM vnf_table_1") + vnf_list = [item[0] for item in cur.fetchall()] + for event_name in vnf_list: + cur.execute("SELECT current_state FROM hb_common") + rows = cur.fetchall() + hbc_state = rows[0][0] + if hbc_state == "RECONFIGURATION": + _logger.info("DBM:Waiting for hb_common state to become RUNNING") + break - hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( - user_name, password, ip_address, port_num, db_name - ) - source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) - connection_db = pm.postgres_db_open() - cur = connection_db.cursor() - if ( - int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING" - ): # pragma: no cover - _logger.info("DBM: Active DB Monitoring Instance") - cur.execute("SELECT event_name FROM vnf_table_1") - vnf_list = [item[0] for item in cur.fetchall()] - for event_name in vnf_list: - cur.execute("SELECT current_state FROM hb_common") - rows = cur.fetchall() - hbc_state = rows[0][0] - if hbc_state == "RECONFIGURATION": - _logger.info("DBM:Waiting for hb_common state to become RUNNING") - break - - cur.execute( - "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " - "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " - "target, version FROM vnf_table_1 WHERE event_name = %s", - (event_name,), - ) - rows = cur.fetchall() - validity_flag = rows[0][0] - source_name_count = rows[0][1] - heartbeat_interval = rows[0][2] - heartbeat_missed_count = rows[0][3] - closed_control_loop_name = rows[0][4] - policy_version = rows[0][5] - policy_name = rows[0][6] - policy_scope = rows[0][7] - target_type = rows[0][8] - target = rows[0][9] - version = rows[0][10] - comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000 - if validity_flag == 1: - for source_name_key in range(source_name_count): - epoc_time = int(round(time.time() * 1000)) + cur.execute( + "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " + "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " + "target, version FROM vnf_table_1 WHERE event_name = %s", + (event_name,), + ) + rows = cur.fetchall() + validity_flag = rows[0][0] + source_name_count = rows[0][1] + heartbeat_interval = rows[0][2] + heartbeat_missed_count = rows[0][3] + closed_control_loop_name = rows[0][4] + policy_version = rows[0][5] + policy_name = rows[0][6] + policy_scope = rows[0][7] + target_type = rows[0][8] + target = rows[0][9] + version = rows[0][10] + comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000 + if validity_flag == 1: + for source_name_key in range(source_name_count): + epoc_time = int(round(time.time() * 1000)) + cur.execute( + "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " + "event_name = %s AND source_name_key = %s", + (event_name, (source_name_key + 1)), + ) + row = cur.fetchall() + if len(row) == 0: + continue + epoc_time_sec = row[0][0] + src_name = row[0][1] + cl_flag = row[0][2] + if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: + sendControlLoopEvent( + "ONSET", + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + src_name, + epoc_time, + closed_control_loop_name, + version, + target, + ) + cl_flag = 1 cur.execute( - "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " - "event_name = %s AND source_name_key = %s", - (event_name, (source_name_key + 1)), + "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", + (cl_flag, event_name, (source_name_key + 1)), ) - row = cur.fetchall() - if len(row) == 0: - continue - epoc_time_sec = row[0][0] - src_name = row[0][1] - cl_flag = row[0][2] - if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover - sendControlLoopEvent( - "ONSET", - pol_url, - policy_version, - policy_name, - policy_scope, - target_type, - src_name, - epoc_time, - closed_control_loop_name, - version, - target, - ) - cl_flag = 1 - cur.execute( - "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", - (cl_flag, event_name, (source_name_key + 1)), - ) - connection_db.commit() - elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover - sendControlLoopEvent( - "ABATED", - pol_url, - policy_version, - policy_name, - policy_scope, - target_type, - src_name, - epoc_time, - closed_control_loop_name, - version, - target, - ) - cl_flag = 0 - cur.execute( - "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", - (cl_flag, event_name, (source_name_key + 1)), - ) - connection_db.commit() + connection_db.commit() + elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: + sendControlLoopEvent( + "ABATED", + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + src_name, + epoc_time, + closed_control_loop_name, + version, + target, + ) + cl_flag = 0 + cur.execute( + "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", + (cl_flag, event_name, (source_name_key + 1)), + ) + connection_db.commit() - else: # pragma: no cover - msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % event_name - _logger.info(msg) + else: + msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % event_name + _logger.info(msg) + + cur.execute("DELETE FROM vnf_table_2 WHERE EVENT_NAME = %s", (event_name,)) + cur.execute("DELETE FROM vnf_table_1 WHERE EVENT_NAME = %s", (event_name,)) + connection_db.commit() + """ + Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2 + """ + else: + msg = "DBM:Inactive instance or hb_common state is not RUNNING" + _logger.info(msg) + try: + connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.close() + return True + except psycopg2.DatabaseError as e: + msg = "COMMON:Error %s" % e + _logger.error(msg) + return False + cur.close() - cur.execute("DELETE FROM vnf_table_2 WHERE EVENT_NAME = %s", (event_name,)) - cur.execute("DELETE FROM vnf_table_1 WHERE EVENT_NAME = %s", (event_name,)) - connection_db.commit() - """ - Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2 - """ - else: # pragma: no cover - msg = "DBM:Inactive instance or hb_common state is not RUNNING" - _logger.info(msg) - try: - connection_db.commit() # <--- makes sure the change is shown in the database - connection_db.close() - return True - except psycopg2.DatabaseError as e: - msg = "COMMON:Error %s" % e - _logger.error(msg) - return False - cur.close() - break def db_monitoring_wrapper(current_pid, jsfile, number_of_iterations=-1): |