diff options
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r-- | miss_htbt_service/htbtworker.py | 271 |
1 files changed, 140 insertions, 131 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index a2ffeca..5fa4074 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -35,90 +35,92 @@ import get_logger _logger = get_logger.get_logger(__name__) + def read_json_file(i, prefix="../../tests"): - if (i==0): - with open (path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile: - cfg = json.load(outfile) + if (i == 0): + with open(path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile: + cfg = json.load(outfile) elif (i == 1): - with open (path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: - cfg = json.load(outfile) - elif (i ==2): - with open( path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile: - cfg = json.load(outfile) + with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: + cfg = json.load(outfile) + elif (i == 2): + with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile: + cfg = json.load(outfile) return cfg + def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): global mr_url - i=0 + i = 0 sleep_duration = 20 - while(True): + while (True): time.sleep(sleep_duration) with open(jsfile, 'r') as outfile: cfg = json.load(outfile) mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) - while(True): - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) - if(hbc_state == "RECONFIGURATION"): + while (True): + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + if (hbc_state == "RECONFIGURATION"): _logger.info("HBT:Waiting for hb_common state to become RUNNING") time.sleep(10) else: break - if(os.getenv('pytest', "") == 'test'): - eventnameList = ["Heartbeat_vDNS","Heartbeat_vFW","Heartbeat_xx"] - connection_db = 0 + if (os.getenv('pytest', "") == 'test'): + eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] + connection_db = 0 else: - connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) - cur = connection_db.cursor() - db_query = "Select event_name from vnf_table_1" - cur.execute(db_query) - eventnameList = [item[0] for item in cur.fetchall()] - msg="\n\nHBT:eventnameList values ", eventnameList + connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) + cur = connection_db.cursor() + db_query = "Select event_name from vnf_table_1" + cur.execute(db_query) + eventnameList = [item[0] for item in cur.fetchall()] + msg = "\n\nHBT:eventnameList values ", eventnameList _logger.info(msg) if "groupID" not in os.environ or "consumerID" not in os.environ: - get_url = mr_url + '/DefaultGroup/1?timeout=15000' + get_url = mr_url + '/DefaultGroup/1?timeout=15000' else: - get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000' - msg="HBT:Getting :"+get_url + get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000' + msg = "HBT:Getting :" + get_url _logger.info(msg) - if(os.getenv('pytest', "") == 'test'): - jsonobj = read_json_file(i) - jobj = [] - jobj.append(jsonobj) - i=i+1 - msg="HBT:newly received test message", jobj - _logger.info(msg) - if (i >= 3): - i=0 - break + if (os.getenv('pytest', "") == 'test'): + jsonobj = read_json_file(i) + jobj = [] + jobj.append(jsonobj) + i = i + 1 + msg = "HBT:newly received test message", jobj + _logger.info(msg) + if (i >= 3): + i = 0 + break else: - res = requests.get(get_url) - msg="HBT:",res.text - _logger.info(msg) - inputString = res.text - #If mrstatus in message body indicates some information, not json msg. - if ("mrstatus" in inputString): - continue - jlist = inputString.split('\n'); - # Process the DMaaP input message retreived - error = False - for line in jlist: - try: - jobj = json.loads(line) - except ValueError: - msg='HBT:Decoding JSON has failed' - _logger.error(msg) - error = True - break - if (error == True): - continue - if len(jobj) == 0: - continue + res = requests.get(get_url) + msg = "HBT:", res.text + _logger.info(msg) + inputString = res.text + # If mrstatus in message body indicates some information, not json msg. + if ("mrstatus" in inputString): + continue + jlist = inputString.split('\n'); + # Process the DMaaP input message retreived + error = False + for line in jlist: + try: + jobj = json.loads(line) + except ValueError: + msg = 'HBT:Decoding JSON has failed' + _logger.error(msg) + error = True + break + if (error == True): + continue + if len(jobj) == 0: + continue for item in jobj: try: - if(os.getenv('pytest', "") == 'test'): + if (os.getenv('pytest', "") == 'test'): jitem = jsonobj else: jitem = json.loads(item) @@ -127,91 +129,95 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): seqnum = (jitem['event']['commonEventHeader']['sequence']) eventName = (jitem['event']['commonEventHeader']['eventName']) except(Exception) as err: - msg = "HBT message process error - ",err + msg = "HBT message process error - ", err _logger.error(msg) continue - msg="HBT:Newly received HB event values ::", eventName,lastepo,srcname + msg = "HBT:Newly received HB event values ::", eventName, lastepo, srcname _logger.info(msg) - if(db_table_creation_check(connection_db,"vnf_table_2") ==False): - msg="HBT:Creating vnf_table_2" - _logger.info(msg) - cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);") + if (db_table_creation_check(connection_db, "vnf_table_2") == False): + msg = "HBT:Creating vnf_table_2" + _logger.info(msg) + cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);") else: - msg="HBT:vnf_table_2 is already there" - _logger.info(msg) - if(eventName in eventnameList): #pragma: no cover - db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" %(eventName) - msg="HBT:",db_query + msg = "HBT:vnf_table_2 is already there" + _logger.info(msg) + if (eventName in eventnameList): # pragma: no cover + db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" % (eventName) + msg = "HBT:", db_query + _logger.info(msg) + if (os.getenv('pytest', "") == 'test'): + break + cur.execute(db_query) + row = cur.fetchone() + source_name_count = row[0] + source_name_key = source_name_count + 1 + cl_flag = 0 + if (source_name_count == 0): # pragma: no cover + msg = "HBT: Insert entry in table_2,source_name_count=0 : ", row _logger.info(msg) - if(os.getenv('pytest', "") == 'test'): - break - cur.execute(db_query) - row = cur.fetchone() - source_name_count = row[0] - source_name_key = source_name_count+1 - cl_flag = 0 - if(source_name_count==0): #pragma: no cover - msg="HBT: Insert entry in table_2,source_name_count=0 : ",row - _logger.info(msg) - query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag) - cur.execute(query_value) - update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName) - cur.execute(update_query) - else: #pragma: no cover - msg="HBT:event name, source_name & source_name_count are",eventName, srcname, source_name_count - _logger.info(msg) - for source_name_key in range(source_name_count): - epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(eventName,(source_name_key+1)) - msg="HBT:eppc query is",epoc_query - _logger.info(msg) - cur.execute(epoc_query) - row = cur.fetchall() - if (len(row)==0): - continue - db_srcname = row[0][0] - if (db_srcname == srcname): - msg="HBT: Update vnf_table_2 : ",source_name_key, row - _logger.info(msg) - update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" %(lastepo,srcname,eventName,(source_name_key+1)) - cur.execute(update_query) - source_name_key = source_name_count - break - else: - continue - msg="HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count + query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" % ( + eventName, source_name_key, lastepo, srcname, cl_flag) + cur.execute(query_value) + update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" % ( + source_name_key, eventName) + cur.execute(update_query) + else: # pragma: no cover + msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count + _logger.info(msg) + for source_name_key in range(source_name_count): + epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (eventName, (source_name_key + 1)) + msg = "HBT:eppc query is", epoc_query _logger.info(msg) - if (source_name_count == (source_name_key+1)): - source_name_key = source_name_count+1 - msg="HBT: Insert entry in table_2 : ",row + cur.execute(epoc_query) + row = cur.fetchall() + if (len(row) == 0): + continue + db_srcname = row[0][0] + if (db_srcname == srcname): + msg = "HBT: Update vnf_table_2 : ", source_name_key, row _logger.info(msg) - insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag) - cur.execute(insert_query) - update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName) + update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" % (lastepo, srcname, eventName, (source_name_key + 1)) cur.execute(update_query) + source_name_key = source_name_count + break + else: + continue + msg = "HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count + _logger.info(msg) + if (source_name_count == (source_name_key + 1)): + source_name_key = source_name_count + 1 + msg = "HBT: Insert entry in table_2 : ", row + _logger.info(msg) + insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" % ( + eventName, source_name_key, lastepo, srcname, cl_flag) + cur.execute(insert_query) + update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" % (source_name_key, eventName) + cur.execute(update_query) else: - _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") + _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") commit_db(connection_db) commit_and_close_db(connection_db) - if(os.getenv('pytest', "") != 'test'): - cur.close() + if (os.getenv('pytest', "") != 'test'): + cur.close() -def postgres_db_open(username,password,host,port,database_name): - if(os.getenv('pytest', "") == 'test'): +def postgres_db_open(username, password, host, port, database_name): + if (os.getenv('pytest', "") == 'test'): return True - connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port) + connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port) return connection -def db_table_creation_check(connection_db,table_name): - if(os.getenv('pytest', "") == 'test'): + +def db_table_creation_check(connection_db, table_name): + if (os.getenv('pytest', "") == 'test'): return True try: cur = connection_db.cursor() - query_db = "select * from information_schema.tables where table_name='%s'" %(table_name) + query_db = "select * from information_schema.tables where table_name='%s'" % (table_name) cur.execute(query_db) database_names = cur.fetchone() - if(database_names is not None): - if(table_name in database_names): + if (database_names is not None): + if (table_name in database_names): return True else: return False @@ -223,22 +229,24 @@ def db_table_creation_check(connection_db,table_name): finally: cur.close() + def commit_db(connection_db): - if(os.getenv('pytest', "") == 'test'): + if (os.getenv('pytest', "") == 'test'): return True try: - connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.commit() # <--- makes sure the change is shown in the database return True except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = 'COMMON:Error %s' % e _logger.error(msg) return False + def commit_and_close_db(connection_db): - if(os.getenv('pytest', "") == 'test'): + if (os.getenv('pytest', "") == 'test'): return True try: - connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() return True except psycopg2.DatabaseError as e: @@ -246,11 +254,12 @@ def commit_and_close_db(connection_db): _logger.error(msg) return False + if __name__ == '__main__': jsfile = sys.argv[1] - msg="HBT:HeartBeat thread Created" + msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") - msg="HBT:The config file name passed is -%s", jsfile + msg = "HBT:The config file name passed is -%s", jsfile _logger.info(msg) - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties(jsfile) - process_msg(jsfile,user_name, password, ip_address, port_num, db_name) + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + process_msg(jsfile, user_name, password, ip_address, port_num, db_name) |