diff options
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r-- | miss_htbt_service/htbtworker.py | 138 |
1 files changed, 72 insertions, 66 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 3328973..22155a3 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -35,13 +35,14 @@ _logger = logging.getLogger(__name__) configjsonfile = "../etc/config.json" + def process_msg(cfgjsonfile, number_of_iterations=-1): """ - Function to poll events from MR continuously - and determine if matching configuration to be - tracked for missed HB function + Function to poll events from MR continuously + and determine if matching configuration to be + tracked for missed HB function """ - + global mr_url global configjsonfile configjsonfile = cfgjsonfile @@ -50,21 +51,21 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): while number_of_iterations != 0: number_of_iterations -= 1 time.sleep(sleep_duration) - print ("*** CFG json file info " + configjsonfile) + print("*** CFG json file info " + configjsonfile) with open(configjsonfile, "r") as outfile: cfg = json.load(outfile) - - print ("*** CFG info " + str(cfg)) + + print("*** CFG info " + str(cfg)) mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"]) - + username = str(cfg["pg_userName"]) password = str(cfg["pg_passwd"]) db_host = str(cfg["pg_ipAddress"]) db_port = cfg["pg_portNum"] database_name = str(cfg["pg_dbName"]) - - reconfig_Flag = check_process_reconfiguration (username, password, db_host, db_port, database_name) - eventname_list = get_eventnamelist () + + reconfig_Flag = check_process_reconfiguration(username, password, db_host, db_port, database_name) + eventname_list = get_eventnamelist() if "groupID" not in os.environ or "consumerID" not in os.environ: get_url = mr_url + "/DefaultGroup/1?timeout=15000" @@ -86,7 +87,7 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): if "mrstatus" in input_string: continue jlist = input_string.split("\n") - print (jlist) + print(jlist) # Process the DMaaP input message retrieved error = False jobj = [] @@ -102,15 +103,15 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): continue if len(jobj) == 0: continue - + _logger.info("HBT jobj Array : %s", jobj) for item in jobj: srcname, lastepo, seqnum, event_name = parse_event(item) msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname _logger.info(msg) - - check_and_create_vnf2_table () - + + check_and_create_vnf2_table() + if event_name in eventname_list: # pragma: no cover cur = sql_executor("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", event_name) row = cur.fetchone() @@ -120,13 +121,14 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): if source_name_count == 0: # pragma: no cover _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag) - else: # pragma: no cover + else: # pragma: no cover msg = "HBT:event name, source_name & source_name_count are", event_name, srcname, source_name_count _logger.info(msg) for source_name_key in range(source_name_count): cur = sql_executor( "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s", - event_name, (source_name_key + 1) + event_name, + (source_name_key + 1), ) row = cur.fetchall() if len(row) == 0: @@ -138,7 +140,10 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): sql_executor( "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", - lastepo, srcname, event_name, (source_name_key + 1) + lastepo, + srcname, + event_name, + (source_name_key + 1), ) source_name_key = source_name_count break @@ -153,32 +158,30 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): else: _logger.info("HBT:eventName is not being monitored, Ignoring JSON message") msg = "HBT: Looping to check for new events from DMAAP" - print ("HBT: Looping to check for new events from DMAAP") + print("HBT: Looping to check for new events from DMAAP") _logger.info(msg) -def new_vnf_entry (event_name, source_name_key, lastepo, srcname, cl_flag): - """ - Wrapper function to update event to tracking tables - """ - - sql_executor( - "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - event_name, source_name_key, lastepo, srcname, cl_flag - ) - sql_executor( - "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", - source_name_key, event_name - ) - -def parse_event (jsonstring): + +def new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag): + """ + Wrapper function to update event to tracking tables + """ + + sql_executor( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", event_name, source_name_key, lastepo, srcname, cl_flag + ) + sql_executor("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", source_name_key, event_name) + + +def parse_event(jsonstring): """ - Function to parse incoming event as json object + Function to parse incoming event as json object and parse out required attributes """ - _logger.info("HBT jsonstring: %s", jsonstring) - #convert string to object + _logger.info("HBT jsonstring: %s", jsonstring) + # convert string to object jitem = json.loads(jsonstring) - + try: srcname = jitem["event"]["commonEventHeader"]["sourceName"] lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] @@ -189,22 +192,26 @@ def parse_event (jsonstring): event_name = jitem["event"]["commonEventHeader"]["eventName"] msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname _logger.info(msg) - return (srcname,lastepo,seqnum,event_name) + return (srcname, lastepo, seqnum, event_name) except Exception as err: msg = "HBT message process error - ", err _logger.error(msg) -def get_eventnamelist (): + +def get_eventnamelist(): """ Function to fetch active monitored eventname list """ - cur = sql_executor("SELECT event_name FROM vnf_table_1",) + cur = sql_executor( + "SELECT event_name FROM vnf_table_1", + ) eventname_list = [item[0] for item in cur.fetchall()] msg = "\n\nHBT:eventnameList values ", eventname_list _logger.info(msg) return eventname_list -def check_and_create_vnf2_table (): + +def check_and_create_vnf2_table(): """ Check and create vnf_table_2 used for tracking HB entries """ @@ -215,7 +222,7 @@ def check_and_create_vnf2_table (): if database_names is not None: if "vnf_table_2" in database_names: table_exist = True - + if table_exist == False: msg = "HBT:Creating vnf_table_2" _logger.info(msg) @@ -228,8 +235,8 @@ def check_and_create_vnf2_table (): LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer - )""" - ,) + )""", + ) else: msg = "HBT:vnf_table_2 is already there" _logger.info(msg) @@ -238,15 +245,14 @@ def check_and_create_vnf2_table (): msg = "COMMON:Error %s" % e _logger.error(msg) return False - + + def check_process_reconfiguration(username, password, host, port, database_name): """ Verify if DB configuration in progress """ while True: - hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( - username, password, host, port, database_name - ) + hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(username, password, host, port, database_name) if hbc_state == "RECONFIGURATION": _logger.info("HBT: RECONFIGURATION in-progress. Waiting for hb_common state to become RUNNING") time.sleep(10) @@ -255,12 +261,13 @@ def check_process_reconfiguration(username, password, host, port, database_name) break return False + def postgres_db_open(): """ - Wrapper function for returning DB connection object + Wrapper function for returning DB connection object """ - - global configjsonfile + + global configjsonfile ( ip_address, @@ -275,22 +282,22 @@ def postgres_db_open(): return connection -def sql_executor (query, *values): +def sql_executor(query, *values): """ wrapper method for DB operation - """ - _logger.info("HBT query: %s values: %s", query, values) - + """ + _logger.info("HBT query: %s values: %s", query, values) + connection_db = postgres_db_open() cur = connection_db.cursor() - cur.execute(query, values) + cur.execute(query, values) update_commands = ["CREATE", "INSERT", "UPDATE"] - if any (x in query for x in update_commands): - try: - connection_db.commit() # <--- makes sure the change is shown in the database - except psycopg2.DatabaseError as e: - msg = "COMMON:Error %s" % e - _logger.error(msg) + if any(x in query for x in update_commands): + try: + connection_db.commit() # <--- makes sure the change is shown in the database + except psycopg2.DatabaseError as e: + msg = "COMMON:Error %s" % e + _logger.error(msg) return cur @@ -310,7 +317,6 @@ if __name__ == "__main__": configjsonfile = sys.argv[1] msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") - msg = "HBT:The config file name passed is -%s", configjsonfile + msg = "HBT:The config file name passed is -%s", configjsonfile _logger.info(msg) process_msg(configjsonfile) - |