diff options
Diffstat (limited to 'miss_htbt_service')
-rw-r--r-- | miss_htbt_service/config_notif.py | 11 | ||||
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 31 | ||||
-rw-r--r-- | miss_htbt_service/htbtworker.py | 51 | ||||
-rw-r--r-- | miss_htbt_service/misshtbtd.py | 64 |
4 files changed, 86 insertions, 71 deletions
diff --git a/miss_htbt_service/config_notif.py b/miss_htbt_service/config_notif.py index cba898d..8da1140 100644 --- a/miss_htbt_service/config_notif.py +++ b/miss_htbt_service/config_notif.py @@ -58,8 +58,7 @@ def db_table_creation_check(connection_db, table_name): cur = None try: cur = connection_db.cursor() - query_db = "select * from information_schema.tables where table_name='%s'" % (table_name) - cur.execute(query_db) + cur.execute("SELECT * FROM information_schema.tables WHERE table_name = %s", (table_name,)) database_names = cur.fetchone() if (database_names is not None) and (table_name in database_names): print(f"FOUND the table {table_name}") @@ -148,8 +147,7 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name): return hbc_pid, hbc_state, hbc_srcName, hbc_time connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;" - cur.execute(query_value) + cur.execute("SELECT process_id, source_name, last_accessed_time, current_state FROM hb_common") rows = cur.fetchall() # TODO: What if rows returned None or empty? print("HB_Notif::hb_common contents - %s" % rows) @@ -171,9 +169,8 @@ def update_hb_common(update_flg, process_id, state, user_name, password, ip_addr return True connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" % ( - process_id, source_name, current_time, state) - cur.execute(query_value) + cur.execute("UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s WHERE " + "PROCESS_ID = %s AND SOURCE_NAME = %s", (current_time, state, process_id, source_name)) commit_and_close_db(connection_db) cur.close() return True diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index 8704966..fe47fd7 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -159,20 +159,19 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ cur = connection_db.cursor() if (int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING"): _logger.info("DBM: Active DB Monitoring Instance") - db_query = "Select event_name from vnf_table_1" - cur.execute(db_query) + cur.execute("SELECT event_name FROM vnf_table_1") vnf_list = [item[0] for item in cur.fetchall()] for event_name in vnf_list: - query_value = "SELECT current_state FROM hb_common;" - cur.execute(query_value) + cur.execute("SELECT current_state FROM hb_common") rows = cur.fetchall() hbc_state = rows[0][0] if (hbc_state == "RECONFIGURATION"): _logger.info("DBM:Waiting for hb_common state to become RUNNING") break - db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" % (event_name) - cur.execute(db_query) + cur.execute("SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " + "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " + "target, version FROM vnf_table_1 WHERE event_name = %s", (event_name,)) rows = cur.fetchall() validity_flag = rows[0][0] source_name_count = rows[0][1] @@ -189,8 +188,8 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ if (validity_flag == 1): for source_name_key in range(source_name_count): epoc_time = int(round(time.time() * 1000)) - epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (event_name, (source_name_key + 1)) - cur.execute(epoc_query) + cur.execute("SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " + "event_name = %s AND source_name_key = %s", (event_name, (source_name_key + 1))) row = cur.fetchall() if (len(row) == 0): continue @@ -202,26 +201,24 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ target_type, srcName, epoc_time, closed_control_loop_name, version, target) cl_flag = 1 - update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1)) - cur.execute(update_query) + cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " + "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1))) connection_db.commit() elif ((epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1): sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) cl_flag = 0 - update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1)) - cur.execute(update_query) + cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " + "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1))) connection_db.commit() else: # pragma: no cover msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % (event_name) _logger.info(msg) - delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" % (event_name) - cur.execute(delete_query_table2) - delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" % (event_name) - cur.execute(delete_query) + cur.execute("DELETE FROM vnf_table_2 WHERE EVENT_NAME = %s", (event_name,)) + cur.execute("DELETE FROM vnf_table_1 WHERE EVENT_NAME = %s", (event_name,)) connection_db.commit() """ Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2 @@ -231,7 +228,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ _logger.info(msg) pm.commit_and_close_db(connection_db) cur.close() - break; + break if __name__ == "__main__": diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 5fa4074..bfde762 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -73,8 +73,7 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): else: connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - db_query = "Select event_name from vnf_table_1" - cur.execute(db_query) + cur.execute("SELECT event_name FROM vnf_table_1") eventnameList = [item[0] for item in cur.fetchall()] msg = "\n\nHBT:eventnameList values ", eventnameList _logger.info(msg) @@ -137,17 +136,22 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if (db_table_creation_check(connection_db, "vnf_table_2") == False): msg = "HBT:Creating vnf_table_2" _logger.info(msg) - cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);") + cur.execute(""" + CREATE TABLE vnf_table_2 ( + EVENT_NAME varchar, + SOURCE_NAME_KEY integer, + PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY), + LAST_EPO_TIME BIGINT, + SOURCE_NAME varchar, + CL_FLAG integer + )""") else: msg = "HBT:vnf_table_2 is already there" _logger.info(msg) if (eventName in eventnameList): # pragma: no cover - db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" % (eventName) - msg = "HBT:", db_query - _logger.info(msg) if (os.getenv('pytest', "") == 'test'): break - cur.execute(db_query) + cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (eventName,)) row = cur.fetchone() source_name_count = row[0] source_name_key = source_name_count + 1 @@ -155,20 +159,16 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if (source_name_count == 0): # pragma: no cover msg = "HBT: Insert entry in table_2,source_name_count=0 : ", row _logger.info(msg) - query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" % ( - eventName, source_name_key, lastepo, srcname, cl_flag) - cur.execute(query_value) - update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" % ( - source_name_key, eventName) - cur.execute(update_query) + cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + (eventName, source_name_key, lastepo, srcname, cl_flag)) + cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", + (source_name_key, eventName)) else: # pragma: no cover msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count _logger.info(msg) for source_name_key in range(source_name_count): - epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (eventName, (source_name_key + 1)) - msg = "HBT:eppc query is", epoc_query - _logger.info(msg) - cur.execute(epoc_query) + cur.execute("SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " + "source_name_key = %s", (eventName, (source_name_key + 1))) row = cur.fetchall() if (len(row) == 0): continue @@ -176,8 +176,9 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if (db_srcname == srcname): msg = "HBT: Update vnf_table_2 : ", source_name_key, row _logger.info(msg) - update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" % (lastepo, srcname, eventName, (source_name_key + 1)) - cur.execute(update_query) + cur.execute("UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " + "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", + (lastepo, srcname, eventName, (source_name_key + 1))) source_name_key = source_name_count break else: @@ -188,11 +189,10 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): source_name_key = source_name_count + 1 msg = "HBT: Insert entry in table_2 : ", row _logger.info(msg) - insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" % ( - eventName, source_name_key, lastepo, srcname, cl_flag) - cur.execute(insert_query) - update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" % (source_name_key, eventName) - cur.execute(update_query) + cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + (eventName, source_name_key, lastepo, srcname, cl_flag)) + cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", + (source_name_key, eventName)) else: _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") commit_db(connection_db) @@ -213,8 +213,7 @@ def db_table_creation_check(connection_db, table_name): return True try: cur = connection_db.cursor() - query_db = "select * from information_schema.tables where table_name='%s'" % (table_name) - cur.execute(query_db) + cur.execute("SELECT * FROM information_schema.tables WHERE table_name = %s", (table_name,)) database_names = cur.fetchone() if (database_names is not None): if (table_name in database_names): diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 1808439..552e56a 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -59,16 +59,14 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password database_name = db_name con.autocommit = True cur = con.cursor() - query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" % (database_name) - cur.execute(query_value) + cur.execute("SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = %s", (database_name,)) not_exists_row = cur.fetchone() msg = "MSHBT:Create_database:DB not exists? ", not_exists_row _logger.info(msg) not_exists = not_exists_row[0] if not_exists is True: _logger.info("MSHBT:Creating database ...") - query_value = "CREATE DATABASE %s" % (database_name) - cur.execute(query_value) + cur.execute("CREATE DATABASE %s", (database_name,)) else: _logger.info("MSHBD:Database already exists") cur.close() @@ -88,8 +86,7 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name): return hbc_pid, hbc_state, hbc_srcName, hbc_time connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() - query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;" - cur.execute(query_value) + cur.execute("SELECT process_id, source_name, last_accessed_time, current_state FROM hb_common") rows = cur.fetchall() hbc_pid = rows[0][0] hbc_srcName = rows[0][1] @@ -109,14 +106,19 @@ def create_update_hb_common(update_flg, process_id, state, user_name, password, connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() if (heartbeat.db_table_creation_check(connection_db, "hb_common") == False): - cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);") - query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" % (process_id, source_name, current_time, state) + cur.execute(""" + CREATE TABLE hb_common ( + PROCESS_ID integer primary key, + SOURCE_NAME varchar, + LAST_ACCESSED_TIME integer, + CURRENT_STATE varchar + )""") + cur.execute("INSERT INTO hb_common VALUES(%s, %s, %s, %s)", (process_id, source_name, current_time, state)) _logger.info("MSHBT:Created hb_common DB and updated new values") - cur.execute(query_value) - if (update_flg == 1): - query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" % (process_id, source_name, current_time, state) + elif (update_flg == 1): + cur.execute("UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s " + "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", (current_time, state, process_id, source_name)) _logger.info("MSHBT:Updated hb_common DB with new values") - cur.execute(query_value) heartbeat.commit_and_close_db(connection_db) cur.close() @@ -132,15 +134,27 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): else: cur = connection_db.cursor() if (heartbeat.db_table_creation_check(connection_db, "vnf_table_1") == False): - cur.execute("CREATE TABLE vnf_table_1 (EVENT_NAME varchar primary key,HEARTBEAT_MISSED_COUNT integer,HEARTBEAT_INTERVAL integer,CLOSED_CONTROL_LOOP_NAME varchar,POLICY_VERSION varchar,POLICY_NAME varchar,POLICY_SCOPE varchar,TARGET_TYPE varchar,TARGET varchar, VERSION varchar,SOURCE_NAME_COUNT integer,VALIDITY_FLAG integer);") + cur.execute(""" + CREATE TABLE vnf_table_1 ( + EVENT_NAME varchar primary key, + HEARTBEAT_MISSED_COUNT integer, + HEARTBEAT_INTERVAL integer, + CLOSED_CONTROL_LOOP_NAME varchar, + POLICY_VERSION varchar, + POLICY_NAME varchar, + POLICY_SCOPE varchar, + TARGET_TYPE varchar, + TARGET varchar, + VERSION varchar, + SOURCE_NAME_COUNT integer, + VALIDITY_FLAG integer + )""") _logger.info("MSHBT:Created vnf_table_1 table") if (update_db == 1): - query_value = "UPDATE vnf_table_1 SET VALIDITY_FLAG=0 where VALIDITY_FLAG=1;" - cur.execute(query_value) + cur.execute("UPDATE vnf_table_1 SET VALIDITY_FLAG=0 WHERE VALIDITY_FLAG=1") _logger.info("MSHBT:Set Validity flag to zero in vnf_table_1 table") # Put some initial values into the queue - db_query = "Select event_name from vnf_table_1" - cur.execute(db_query) + cur.execute("SELECT event_name FROM vnf_table_1") vnf_list = [item[0] for item in cur.fetchall()] for vnf in (jhbcfg['vnfs']): nfc = vnf['eventName'] @@ -156,12 +170,20 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): target = vnf['target'] version = vnf['version'] + if (envPytest == 'test'): + # skip executing SQL in test + continue if (nfc not in vnf_list): - query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" % (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, source_name_count, validity_flag) + cur.execute("INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, + version, source_name_count, validity_flag)) + _logger.debug("Inserted new event_name = %s into vnf_table_1", nfc) else: - query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" % (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, validity_flag, nfc) - if (envPytest != 'test'): - cur.execute(query_value) + cur.execute("""UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s, + CLOSED_CONTROL_LOOP_NAME = %s, POLICY_VERSION = %s, POLICY_NAME = %s, POLICY_SCOPE = %s, + TARGET_TYPE = %s, TARGET = %s, VERSION = %s, VALIDITY_FLAG = %s where EVENT_NAME = %s""", + (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, + validity_flag, nfc)) if (envPytest != 'test'): cur.close() _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file") |