diff options
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r--[-rwxr-xr-x] | miss_htbt_service/htbtworker.py | 422 |
1 files changed, 220 insertions, 202 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 6123386..5b62943 100755..100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2017 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,217 +13,235 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# Author Gokul Singaraju gs244f@att.com +# Author Prakash Hosangady(ph553f@att.com) # Simple Microservice # Tracks Heartbeat messages on input topic in DMaaP -# and generates Missing Heartbeat signal for Policy Engine +# and poppulate the information in postgres DB +import psycopg2 import requests -import math -import sched, datetime, time -import json -import string -import sys +import os +import json,sys,time +import misshtbtd as db +import logging +import get_logger +import os.path as path +_logger = get_logger.get_logger(__name__) -# Initialise tracking hash tables -intvl = 60 -missing_htbt = 2 -#tracks last epoch time -hearttrack = {} -#tracks sequence number -heartstate = {} -#tracks sequence number differences -heartflag = {} -#saves heartbeat message for policy -heartmsg = {} -mr_url = 'http://mrrouter.onap.org:3904' -pol_url = 'http://mrrouter.onap.org:3904' -intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT' -outopic = 'POLICY-HILOTCA-EVENT-OUTPUT' -nfc = "vVNF" -cl_loop = 'ControlLoopEvent1' -periodic_scheduler = None +def read_json_file(i): + if (i==0): + with open (path.abspath(path.join(__file__, "../../tests/test1.json")), "r") as outfile: + cfg = json.load(outfile) + elif (i == 1): + with open (path.abspath(path.join(__file__, "../../tests/test2.json")), "r") as outfile: + cfg = json.load(outfile) + elif (i ==2): + with open( path.abspath(path.join(__file__, "../../tests/test3.json")), 'r') as outfile: + cfg = json.load(outfile) + return cfg -# Checks for heartbeat event on periodic basis -class PeriodicScheduler(object): - def __init__(self): - self.scheduler = sched.scheduler(time.time, time.sleep) - - def setup(self, interval, action, actionargs=()): - #print("Args are :", locals()) - action(*actionargs) - self.scheduler.enter(interval, 1, self.setup,(interval, action, actionargs)) - def run(self): - self.scheduler.run() +def process_msg(jsfile,user_name, password, ip_address, port_num, db_name): + global mr_url + i=0 + sleep_duration = 10 + while(True): + time.sleep(sleep_duration) + with open(jsfile, 'r') as outfile: + cfg = json.load(outfile) + mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url']) - def stop(self): - list(map(self.scheduler.cancel, self.scheduler.queue)) + while(True): + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + if(hbc_state == "RECONFIGURATION"): + _logger.info("HBT:Waiting for hb_common state to become RUNNING") + time.sleep(10) + else: + break + + if(os.getenv('pytest', "") == 'test'): + eventnameList = ["Heartbeat_vDNS","Heartbeat_vFW","Heartbeat_xx"] + else: + connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) + cur = connection_db.cursor() + db_query = "Select event_name from vnf_table_1" + cur.execute(db_query) + eventnameList = [item[0] for item in cur.fetchall()] + msg="\n\nHBT:eventnameList values ", eventnameList + _logger.info(msg) + if "groupID" not in os.environ or "consumerID" not in os.environ: + get_url = mr_url + 'DefaultGroup/1?timeout=15000' + else: + get_url = mr_url + os.getenv('groupID') + '/' + os.getenv('consumerID') + '?timeout=15000' + msg="HBT:Getting :"+get_url + _logger.info(msg) + if(os.getenv('pytest', "") == 'test'): + jsonobj = read_json_file(i) + jobj = [] + jobj.append(jsonobj) + i=i+1 + msg="HBT:newly received test message", jobj + _logger.info(msg) + if (i >= 3): + i=0 + break + else: + res = requests.get(get_url) + msg="HBT:",res.text + _logger.info(msg) + inputString = res.text + #If mrstatus in message body indicates some information, not json msg. + if ("mrstatus" in inputString): + if (sleep_duration < 60): + sleep_duration = sleep_duration + 10 + continue + jlist = inputString.split('\n'); + # Process the DMaaP input message retreived + for line in jlist: + try: + jobj = json.loads(line) + except ValueError: + msg='HBT:Decoding JSON has failed' + _logger.error(msg) + continue + if len(jobj) == 0: + continue + for item in jobj: + try: + if(os.getenv('pytest', "") == 'test'): + jitem = jsonobj + else: + jitem = json.loads(item) + srcname = (jitem['event']['commonEventHeader']['sourceName']) + lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec']) + seqnum = (jitem['event']['commonEventHeader']['sequence']) + eventName = (jitem['event']['commonEventHeader']['eventName']) + except(Exception) as err: + msg = "HBT message process error - ",err + _logger.error(msg) + continue + msg="HBT:Newly received HB event values ::", eventName,lastepo,srcname + _logger.info(msg) + if(db_table_creation_check(connection_db,"vnf_table_2") ==False): + msg="HBT:Creating vnf_table_2" + _logger.info(msg) + cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);") + else: + msg="HBT:vnf_table_2 is already there" + _logger.info(msg) + if(eventName in eventnameList): + db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" %(eventName) + msg="HBT:",db_query + _logger.info(msg) + if(os.getenv('pytest', "") == 'test'): + break + cur.execute(db_query) + row = cur.fetchone() + source_name_count = row[0] + source_name_key = source_name_count+1 + cl_flag = 0 + if(source_name_count==0): + msg="HBT: Insert entry in table_2,source_name_count=0 : ",row + _logger.info(msg) + query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag) + cur.execute(query_value) + update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName) + cur.execute(update_query) + else: + msg="HBT:event name, source_name & source_name_count are",eventName, srcname, source_name_count + _logger.info(msg) + for source_name_key in range(source_name_count): + epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(eventName,(source_name_key+1)) + msg="HBT:eppc query is",epoc_query + _logger.info(msg) + cur.execute(epoc_query) + row = cur.fetchall() + if (len(row)==0): + continue + db_srcname = row[0][0] + if (db_srcname == srcname): + msg="HBT: Update vnf_table_2 : ",source_name_key, row + _logger.info(msg) + update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" %(lastepo,srcname,eventName,(source_name_key+1)) + cur.execute(update_query) + source_name_key = source_name_count + break + else: + continue + msg="HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count + _logger.info(msg) + if (source_name_count == (source_name_key+1)): + source_name_key = source_name_count+1 + msg="HBT: Insert entry in table_2 : ",row + _logger.info(msg) + insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag) + cur.execute(insert_query) + update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName) + cur.execute(update_query) + else: + _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") + commit_db(connection_db) + commit_and_close_db(connection_db) + if(os.getenv('pytest', "") != 'test'): + cur.close() + +def postgres_db_open(username,password,host,port,database_name): + + if(os.getenv('pytest', "") == 'test'): + return True + connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port) + return connection -# Process the heartbeat event on input topic -def periodic_event(): - global periodic_scheduler - global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop - ret = 0 - #print("Args are :", locals()) - print("{0} Checking...".format(datetime.datetime.now())) - #Read heartbeat - #get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000' - get_url = mr_url+'/DefaultGroup/1?timeout=15000' - print("Getting :"+get_url) - try: - res = requests.get(get_url) - #print(res) - #print(res.headers) - print(res.text) - #print(res.json) - inputString = res.text - #jlist = json.loads(inputString) - jlist = inputString.split('\n'); - #print("List:"+jlist[0]) - # Process the DMaaP input message retreived - for line in jlist: - print("Line:"+line) - try: - jobj = json.loads(line) - except ValueError: - print('Decoding JSON has failed') - continue - #print(jobj) - srcid = (jobj['event']['commonEventHeader']['sourceId']) - lastepo = (jobj['event']['commonEventHeader']['lastEpochMicrosec']) - seqnum = (jobj['event']['commonEventHeader']['sequence']) - nfcode = (jobj['event']['commonEventHeader']['nfNamingCode']) - if( nfcode and nfc != nfcode): - continue - if( srcid in hearttrack ): - tdiff = lastepo - hearttrack[srcid] - sdiff = seqnum - heartstate[srcid] - print("Existing source time diff :"+str(tdiff)+" seqdiff :"+str(sdiff)) - # check time difference is within limits and seq num is less than allowed - if((0 <= tdiff <= 61000000) and sdiff < missing_htbt): - print("Heartbeat Alive...") - hearttrack[srcid] = lastepo - heartstate[srcid] = seqnum; - heartflag[srcid] = sdiff; - heartmsg[srcid] = jobj; - else: - jobj["internalHeaderFields"] = json.dumps({ - "closedLoopFlag": "True", - "eventTag": "hp.Heartbeat Service.20171022.8447964515", - "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT", - "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000", - "closedLoopControlName": cl_loop, - "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000" - }); - heartmsg[srcid] = jobj; - payload = heartmsg[srcid] - print(payload) - #psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' - psend_url = pol_url+'/DefaultGroup/1?timeout=15000' - print(psend_url) - print("Heartbeat Dead raising alarm event "+psend_url) - #Send response for policy on output topic - r = requests.post(psend_url, data=payload) - print(r.status_code, r.reason) - ret = r.status_code - del heartstate[srcid] - del hearttrack[srcid] - del heartflag[srcid] - else: - print("Adding new source") - hearttrack[srcid] = lastepo - heartstate[srcid] = seqnum - heartflag[srcid] = 1 - heartmsg[srcid] = jobj; - ret = 1 - chkeys = [] - for key in heartstate.keys(): - print(key,heartstate[key]) - if( heartflag[key] == 0 ): - print("Heartbeat Dead raise alarm event"+key) - chkeys.append( key ) - #print payload - heartmsg[key]["internalHeaderFields"] = json.dumps({ - "closedLoopFlag": "True", - "eventTag": "hp.Heartbeat Service.20171022.8447964515", - "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT", - "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000", - "closedLoopControlName": cl_loop, - "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000" - }) - payload = heartmsg[key] - print(payload) - send_url = pol_url+'/DefaultGroup/1?timeout=15000' - print(send_url) - r = requests.post(send_url, data=payload) - print(r.status_code, r.reason) - ret = r.status_code - heartflag[key] = 0 - for chkey in chkeys: - print(chkey) - del heartstate[chkey] - del hearttrack[chkey] - del heartflag[chkey] - except requests.exceptions.ConnectionError: - print("Connection refused ..") - return ret +def db_table_creation_check(connection_db,table_name): + if(os.getenv('pytest', "") == 'test'): + return True + try: + cur = connection_db.cursor() + query_db = "select * from information_schema.tables where table_name='%s'" %(table_name) + cur.execute(query_db) + database_names = cur.fetchone() + if(database_names is not None): + if(table_name in database_names): + return True + else: + return False + + + except (psycopg2.DatabaseError, e): + msg = 'COMMON:Error %s' % e + _logger.error(msg) + finally: + cur.close() -#test setup for coverage -def test_setup(args): - global mr_url, pol_url, missing_htbt, intvl, intopic, outopic - missing_htbt = float(int(args[2])) - intvl = float(int(args[3])) - intopic = args[4] - outopic = args[5] - mr_url = get_collector_uri()+'/events/'+intopic - pol_url = get_policy_uri()+'/events/'+outopic - print ("Message router url %s " % mr_url) - print ("Policy url %s " % pol_url) - print ("Interval %s " % intvl) - print ("Input topic %s " % intopic) - print ("Output topic %s " % outopic) - #intvl = 60 # every second +def commit_db(connection_db): + if(os.getenv('pytest', "") == 'test'): + return True + try: + connection_db.commit() # <--- makes sure the change is shown in the database + return True + except(psycopg2.DatabaseError, e): + msg = 'COMMON:Error %s' % e + _logger.error(msg) + return False -#Main invocation -def main(args): - global periodic_scheduler - global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop - #mr_url = get_collector_uri() - #pol_url = get_policy_uri() - mr_url = args[0] - intopic = args[1] - pol_url = args[2] - outopic = args[3] - nfc = args[4] - missing_htbt = int(args[5]) - intvl = int(args[6]) - cl_loop = args[7] - print ("Message router url %s " % mr_url) - print ("Policy router url %s " % pol_url) - print ("VNF %s " % nfc) - print ("Interval %s " % intvl) - if( cl_loop != "internal_test") : - #intvl = 60 # every second - #Start periodic scheduler runs every interval - periodic_scheduler = PeriodicScheduler() - periodic_scheduler.setup(intvl, periodic_event,) # it executes the event just once - periodic_scheduler.run() # it starts the scheduler +def commit_and_close_db(connection_db): + if(os.getenv('pytest', "") == 'test'): + return True + try: + connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.close() + return True + except(psycopg2.DatabaseError, e): + msg = 'COMMON:Error %s' % e + _logger.error(msg) + return False -if __name__ == "__main__": - total = len(sys.argv) - cmdargs = str(sys.argv) - print ("The total numbers of args passed to the script: %d " % total) - print ("Missing Heartbeat Args list: %s " % cmdargs) - print ("Script name: %s" % str(sys.argv[0])) - for i in range(total): - print ("Argument # %d : %s" % (i, str(sys.argv[i]))) - main(sys.argv[1:]) - - -#force stop scheduler -def stop(): - global periodic_scheduler - if not periodic_scheduler is None: - periodic_scheduler.stop() +if __name__ == '__main__': + jsfile = sys.argv[1] + msg="HBT:HeartBeat thread Created" + _logger.info("HBT:HeartBeat thread Created") + msg="HBT:The config file name passed is -%s", jsfile + _logger.info(msg) + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties() + process_msg(jsfile,user_name, password, ip_address, port_num, db_name) |