aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/htbtworker.py
diff options
context:
space:
mode:
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/htbtworker.py422
1 files changed, 220 insertions, 202 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index 6123386..5b62943 100755..100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# Copyright 2017 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,217 +13,235 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-# Author Gokul Singaraju gs244f@att.com
+# Author Prakash Hosangady(ph553f@att.com)
# Simple Microservice
# Tracks Heartbeat messages on input topic in DMaaP
-# and generates Missing Heartbeat signal for Policy Engine
+# and poppulate the information in postgres DB
+import psycopg2
import requests
-import math
-import sched, datetime, time
-import json
-import string
-import sys
+import os
+import json,sys,time
+import misshtbtd as db
+import logging
+import get_logger
+import os.path as path
+_logger = get_logger.get_logger(__name__)
-# Initialise tracking hash tables
-intvl = 60
-missing_htbt = 2
-#tracks last epoch time
-hearttrack = {}
-#tracks sequence number
-heartstate = {}
-#tracks sequence number differences
-heartflag = {}
-#saves heartbeat message for policy
-heartmsg = {}
-mr_url = 'http://mrrouter.onap.org:3904'
-pol_url = 'http://mrrouter.onap.org:3904'
-intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT'
-outopic = 'POLICY-HILOTCA-EVENT-OUTPUT'
-nfc = "vVNF"
-cl_loop = 'ControlLoopEvent1'
-periodic_scheduler = None
+def read_json_file(i):
+ if (i==0):
+ with open (path.abspath(path.join(__file__, "../../tests/test1.json")), "r") as outfile:
+ cfg = json.load(outfile)
+ elif (i == 1):
+ with open (path.abspath(path.join(__file__, "../../tests/test2.json")), "r") as outfile:
+ cfg = json.load(outfile)
+ elif (i ==2):
+ with open( path.abspath(path.join(__file__, "../../tests/test3.json")), 'r') as outfile:
+ cfg = json.load(outfile)
+ return cfg
-# Checks for heartbeat event on periodic basis
-class PeriodicScheduler(object):
- def __init__(self):
- self.scheduler = sched.scheduler(time.time, time.sleep)
-
- def setup(self, interval, action, actionargs=()):
- #print("Args are :", locals())
- action(*actionargs)
- self.scheduler.enter(interval, 1, self.setup,(interval, action, actionargs))
- def run(self):
- self.scheduler.run()
+def process_msg(jsfile,user_name, password, ip_address, port_num, db_name):
+ global mr_url
+ i=0
+ sleep_duration = 10
+ while(True):
+ time.sleep(sleep_duration)
+ with open(jsfile, 'r') as outfile:
+ cfg = json.load(outfile)
+ mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url'])
- def stop(self):
- list(map(self.scheduler.cancel, self.scheduler.queue))
+ while(True):
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
+ if(hbc_state == "RECONFIGURATION"):
+ _logger.info("HBT:Waiting for hb_common state to become RUNNING")
+ time.sleep(10)
+ else:
+ break
+
+ if(os.getenv('pytest', "") == 'test'):
+ eventnameList = ["Heartbeat_vDNS","Heartbeat_vFW","Heartbeat_xx"]
+ else:
+ connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ cur = connection_db.cursor()
+ db_query = "Select event_name from vnf_table_1"
+ cur.execute(db_query)
+ eventnameList = [item[0] for item in cur.fetchall()]
+ msg="\n\nHBT:eventnameList values ", eventnameList
+ _logger.info(msg)
+ if "groupID" not in os.environ or "consumerID" not in os.environ:
+ get_url = mr_url + 'DefaultGroup/1?timeout=15000'
+ else:
+ get_url = mr_url + os.getenv('groupID') + '/' + os.getenv('consumerID') + '?timeout=15000'
+ msg="HBT:Getting :"+get_url
+ _logger.info(msg)
+ if(os.getenv('pytest', "") == 'test'):
+ jsonobj = read_json_file(i)
+ jobj = []
+ jobj.append(jsonobj)
+ i=i+1
+ msg="HBT:newly received test message", jobj
+ _logger.info(msg)
+ if (i >= 3):
+ i=0
+ break
+ else:
+ res = requests.get(get_url)
+ msg="HBT:",res.text
+ _logger.info(msg)
+ inputString = res.text
+ #If mrstatus in message body indicates some information, not json msg.
+ if ("mrstatus" in inputString):
+ if (sleep_duration < 60):
+ sleep_duration = sleep_duration + 10
+ continue
+ jlist = inputString.split('\n');
+ # Process the DMaaP input message retreived
+ for line in jlist:
+ try:
+ jobj = json.loads(line)
+ except ValueError:
+ msg='HBT:Decoding JSON has failed'
+ _logger.error(msg)
+ continue
+ if len(jobj) == 0:
+ continue
+ for item in jobj:
+ try:
+ if(os.getenv('pytest', "") == 'test'):
+ jitem = jsonobj
+ else:
+ jitem = json.loads(item)
+ srcname = (jitem['event']['commonEventHeader']['sourceName'])
+ lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec'])
+ seqnum = (jitem['event']['commonEventHeader']['sequence'])
+ eventName = (jitem['event']['commonEventHeader']['eventName'])
+ except(Exception) as err:
+ msg = "HBT message process error - ",err
+ _logger.error(msg)
+ continue
+ msg="HBT:Newly received HB event values ::", eventName,lastepo,srcname
+ _logger.info(msg)
+ if(db_table_creation_check(connection_db,"vnf_table_2") ==False):
+ msg="HBT:Creating vnf_table_2"
+ _logger.info(msg)
+ cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);")
+ else:
+ msg="HBT:vnf_table_2 is already there"
+ _logger.info(msg)
+ if(eventName in eventnameList):
+ db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" %(eventName)
+ msg="HBT:",db_query
+ _logger.info(msg)
+ if(os.getenv('pytest', "") == 'test'):
+ break
+ cur.execute(db_query)
+ row = cur.fetchone()
+ source_name_count = row[0]
+ source_name_key = source_name_count+1
+ cl_flag = 0
+ if(source_name_count==0):
+ msg="HBT: Insert entry in table_2,source_name_count=0 : ",row
+ _logger.info(msg)
+ query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag)
+ cur.execute(query_value)
+ update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName)
+ cur.execute(update_query)
+ else:
+ msg="HBT:event name, source_name & source_name_count are",eventName, srcname, source_name_count
+ _logger.info(msg)
+ for source_name_key in range(source_name_count):
+ epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(eventName,(source_name_key+1))
+ msg="HBT:eppc query is",epoc_query
+ _logger.info(msg)
+ cur.execute(epoc_query)
+ row = cur.fetchall()
+ if (len(row)==0):
+ continue
+ db_srcname = row[0][0]
+ if (db_srcname == srcname):
+ msg="HBT: Update vnf_table_2 : ",source_name_key, row
+ _logger.info(msg)
+ update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" %(lastepo,srcname,eventName,(source_name_key+1))
+ cur.execute(update_query)
+ source_name_key = source_name_count
+ break
+ else:
+ continue
+ msg="HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count
+ _logger.info(msg)
+ if (source_name_count == (source_name_key+1)):
+ source_name_key = source_name_count+1
+ msg="HBT: Insert entry in table_2 : ",row
+ _logger.info(msg)
+ insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag)
+ cur.execute(insert_query)
+ update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName)
+ cur.execute(update_query)
+ else:
+ _logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
+ commit_db(connection_db)
+ commit_and_close_db(connection_db)
+ if(os.getenv('pytest', "") != 'test'):
+ cur.close()
+
+def postgres_db_open(username,password,host,port,database_name):
+
+ if(os.getenv('pytest', "") == 'test'):
+ return True
+ connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port)
+ return connection
-# Process the heartbeat event on input topic
-def periodic_event():
- global periodic_scheduler
- global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop
- ret = 0
- #print("Args are :", locals())
- print("{0} Checking...".format(datetime.datetime.now()))
- #Read heartbeat
- #get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000'
- get_url = mr_url+'/DefaultGroup/1?timeout=15000'
- print("Getting :"+get_url)
- try:
- res = requests.get(get_url)
- #print(res)
- #print(res.headers)
- print(res.text)
- #print(res.json)
- inputString = res.text
- #jlist = json.loads(inputString)
- jlist = inputString.split('\n');
- #print("List:"+jlist[0])
- # Process the DMaaP input message retreived
- for line in jlist:
- print("Line:"+line)
- try:
- jobj = json.loads(line)
- except ValueError:
- print('Decoding JSON has failed')
- continue
- #print(jobj)
- srcid = (jobj['event']['commonEventHeader']['sourceId'])
- lastepo = (jobj['event']['commonEventHeader']['lastEpochMicrosec'])
- seqnum = (jobj['event']['commonEventHeader']['sequence'])
- nfcode = (jobj['event']['commonEventHeader']['nfNamingCode'])
- if( nfcode and nfc != nfcode):
- continue
- if( srcid in hearttrack ):
- tdiff = lastepo - hearttrack[srcid]
- sdiff = seqnum - heartstate[srcid]
- print("Existing source time diff :"+str(tdiff)+" seqdiff :"+str(sdiff))
- # check time difference is within limits and seq num is less than allowed
- if((0 <= tdiff <= 61000000) and sdiff < missing_htbt):
- print("Heartbeat Alive...")
- hearttrack[srcid] = lastepo
- heartstate[srcid] = seqnum;
- heartflag[srcid] = sdiff;
- heartmsg[srcid] = jobj;
- else:
- jobj["internalHeaderFields"] = json.dumps({
- "closedLoopFlag": "True",
- "eventTag": "hp.Heartbeat Service.20171022.8447964515",
- "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT",
- "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000",
- "closedLoopControlName": cl_loop,
- "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000"
- });
- heartmsg[srcid] = jobj;
- payload = heartmsg[srcid]
- print(payload)
- #psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
- psend_url = pol_url+'/DefaultGroup/1?timeout=15000'
- print(psend_url)
- print("Heartbeat Dead raising alarm event "+psend_url)
- #Send response for policy on output topic
- r = requests.post(psend_url, data=payload)
- print(r.status_code, r.reason)
- ret = r.status_code
- del heartstate[srcid]
- del hearttrack[srcid]
- del heartflag[srcid]
- else:
- print("Adding new source")
- hearttrack[srcid] = lastepo
- heartstate[srcid] = seqnum
- heartflag[srcid] = 1
- heartmsg[srcid] = jobj;
- ret = 1
- chkeys = []
- for key in heartstate.keys():
- print(key,heartstate[key])
- if( heartflag[key] == 0 ):
- print("Heartbeat Dead raise alarm event"+key)
- chkeys.append( key )
- #print payload
- heartmsg[key]["internalHeaderFields"] = json.dumps({
- "closedLoopFlag": "True",
- "eventTag": "hp.Heartbeat Service.20171022.8447964515",
- "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT",
- "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000",
- "closedLoopControlName": cl_loop,
- "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000"
- })
- payload = heartmsg[key]
- print(payload)
- send_url = pol_url+'/DefaultGroup/1?timeout=15000'
- print(send_url)
- r = requests.post(send_url, data=payload)
- print(r.status_code, r.reason)
- ret = r.status_code
- heartflag[key] = 0
- for chkey in chkeys:
- print(chkey)
- del heartstate[chkey]
- del hearttrack[chkey]
- del heartflag[chkey]
- except requests.exceptions.ConnectionError:
- print("Connection refused ..")
- return ret
+def db_table_creation_check(connection_db,table_name):
+ if(os.getenv('pytest', "") == 'test'):
+ return True
+ try:
+ cur = connection_db.cursor()
+ query_db = "select * from information_schema.tables where table_name='%s'" %(table_name)
+ cur.execute(query_db)
+ database_names = cur.fetchone()
+ if(database_names is not None):
+ if(table_name in database_names):
+ return True
+ else:
+ return False
+
+
+ except (psycopg2.DatabaseError, e):
+ msg = 'COMMON:Error %s' % e
+ _logger.error(msg)
+ finally:
+ cur.close()
-#test setup for coverage
-def test_setup(args):
- global mr_url, pol_url, missing_htbt, intvl, intopic, outopic
- missing_htbt = float(int(args[2]))
- intvl = float(int(args[3]))
- intopic = args[4]
- outopic = args[5]
- mr_url = get_collector_uri()+'/events/'+intopic
- pol_url = get_policy_uri()+'/events/'+outopic
- print ("Message router url %s " % mr_url)
- print ("Policy url %s " % pol_url)
- print ("Interval %s " % intvl)
- print ("Input topic %s " % intopic)
- print ("Output topic %s " % outopic)
- #intvl = 60 # every second
+def commit_db(connection_db):
+ if(os.getenv('pytest', "") == 'test'):
+ return True
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ return True
+ except(psycopg2.DatabaseError, e):
+ msg = 'COMMON:Error %s' % e
+ _logger.error(msg)
+ return False
-#Main invocation
-def main(args):
- global periodic_scheduler
- global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop
- #mr_url = get_collector_uri()
- #pol_url = get_policy_uri()
- mr_url = args[0]
- intopic = args[1]
- pol_url = args[2]
- outopic = args[3]
- nfc = args[4]
- missing_htbt = int(args[5])
- intvl = int(args[6])
- cl_loop = args[7]
- print ("Message router url %s " % mr_url)
- print ("Policy router url %s " % pol_url)
- print ("VNF %s " % nfc)
- print ("Interval %s " % intvl)
- if( cl_loop != "internal_test") :
- #intvl = 60 # every second
- #Start periodic scheduler runs every interval
- periodic_scheduler = PeriodicScheduler()
- periodic_scheduler.setup(intvl, periodic_event,) # it executes the event just once
- periodic_scheduler.run() # it starts the scheduler
+def commit_and_close_db(connection_db):
+ if(os.getenv('pytest', "") == 'test'):
+ return True
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ connection_db.close()
+ return True
+ except(psycopg2.DatabaseError, e):
+ msg = 'COMMON:Error %s' % e
+ _logger.error(msg)
+ return False
-if __name__ == "__main__":
- total = len(sys.argv)
- cmdargs = str(sys.argv)
- print ("The total numbers of args passed to the script: %d " % total)
- print ("Missing Heartbeat Args list: %s " % cmdargs)
- print ("Script name: %s" % str(sys.argv[0]))
- for i in range(total):
- print ("Argument # %d : %s" % (i, str(sys.argv[i])))
- main(sys.argv[1:])
-
-
-#force stop scheduler
-def stop():
- global periodic_scheduler
- if not periodic_scheduler is None:
- periodic_scheduler.stop()
+if __name__ == '__main__':
+ jsfile = sys.argv[1]
+ msg="HBT:HeartBeat thread Created"
+ _logger.info("HBT:HeartBeat thread Created")
+ msg="HBT:The config file name passed is -%s", jsfile
+ _logger.info(msg)
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties()
+ process_msg(jsfile,user_name, password, ip_address, port_num, db_name)