aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/htbtworker.py
diff options
context:
space:
mode:
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r--miss_htbt_service/htbtworker.py138
1 files changed, 72 insertions, 66 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index 3328973..22155a3 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -35,13 +35,14 @@ _logger = logging.getLogger(__name__)
configjsonfile = "../etc/config.json"
+
def process_msg(cfgjsonfile, number_of_iterations=-1):
"""
- Function to poll events from MR continuously
- and determine if matching configuration to be
- tracked for missed HB function
+ Function to poll events from MR continuously
+ and determine if matching configuration to be
+ tracked for missed HB function
"""
-
+
global mr_url
global configjsonfile
configjsonfile = cfgjsonfile
@@ -50,21 +51,21 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
while number_of_iterations != 0:
number_of_iterations -= 1
time.sleep(sleep_duration)
- print ("*** CFG json file info " + configjsonfile)
+ print("*** CFG json file info " + configjsonfile)
with open(configjsonfile, "r") as outfile:
cfg = json.load(outfile)
-
- print ("*** CFG info " + str(cfg))
+
+ print("*** CFG info " + str(cfg))
mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"])
-
+
username = str(cfg["pg_userName"])
password = str(cfg["pg_passwd"])
db_host = str(cfg["pg_ipAddress"])
db_port = cfg["pg_portNum"]
database_name = str(cfg["pg_dbName"])
-
- reconfig_Flag = check_process_reconfiguration (username, password, db_host, db_port, database_name)
- eventname_list = get_eventnamelist ()
+
+ reconfig_Flag = check_process_reconfiguration(username, password, db_host, db_port, database_name)
+ eventname_list = get_eventnamelist()
if "groupID" not in os.environ or "consumerID" not in os.environ:
get_url = mr_url + "/DefaultGroup/1?timeout=15000"
@@ -86,7 +87,7 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
if "mrstatus" in input_string:
continue
jlist = input_string.split("\n")
- print (jlist)
+ print(jlist)
# Process the DMaaP input message retrieved
error = False
jobj = []
@@ -102,15 +103,15 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
continue
if len(jobj) == 0:
continue
-
+
_logger.info("HBT jobj Array : %s", jobj)
for item in jobj:
srcname, lastepo, seqnum, event_name = parse_event(item)
msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname
_logger.info(msg)
-
- check_and_create_vnf2_table ()
-
+
+ check_and_create_vnf2_table()
+
if event_name in eventname_list: # pragma: no cover
cur = sql_executor("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", event_name)
row = cur.fetchone()
@@ -120,13 +121,14 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
if source_name_count == 0: # pragma: no cover
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag)
- else: # pragma: no cover
+ else: # pragma: no cover
msg = "HBT:event name, source_name & source_name_count are", event_name, srcname, source_name_count
_logger.info(msg)
for source_name_key in range(source_name_count):
cur = sql_executor(
"SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s",
- event_name, (source_name_key + 1)
+ event_name,
+ (source_name_key + 1),
)
row = cur.fetchall()
if len(row) == 0:
@@ -138,7 +140,10 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
sql_executor(
"UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
"WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
- lastepo, srcname, event_name, (source_name_key + 1)
+ lastepo,
+ srcname,
+ event_name,
+ (source_name_key + 1),
)
source_name_key = source_name_count
break
@@ -153,32 +158,30 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
else:
_logger.info("HBT:eventName is not being monitored, Ignoring JSON message")
msg = "HBT: Looping to check for new events from DMAAP"
- print ("HBT: Looping to check for new events from DMAAP")
+ print("HBT: Looping to check for new events from DMAAP")
_logger.info(msg)
-def new_vnf_entry (event_name, source_name_key, lastepo, srcname, cl_flag):
- """
- Wrapper function to update event to tracking tables
- """
-
- sql_executor(
- "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- event_name, source_name_key, lastepo, srcname, cl_flag
- )
- sql_executor(
- "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
- source_name_key, event_name
- )
-
-def parse_event (jsonstring):
+
+def new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag):
+ """
+ Wrapper function to update event to tracking tables
+ """
+
+ sql_executor(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", event_name, source_name_key, lastepo, srcname, cl_flag
+ )
+ sql_executor("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", source_name_key, event_name)
+
+
+def parse_event(jsonstring):
"""
- Function to parse incoming event as json object
+ Function to parse incoming event as json object
and parse out required attributes
"""
- _logger.info("HBT jsonstring: %s", jsonstring)
- #convert string to object
+ _logger.info("HBT jsonstring: %s", jsonstring)
+ # convert string to object
jitem = json.loads(jsonstring)
-
+
try:
srcname = jitem["event"]["commonEventHeader"]["sourceName"]
lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
@@ -189,22 +192,26 @@ def parse_event (jsonstring):
event_name = jitem["event"]["commonEventHeader"]["eventName"]
msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname
_logger.info(msg)
- return (srcname,lastepo,seqnum,event_name)
+ return (srcname, lastepo, seqnum, event_name)
except Exception as err:
msg = "HBT message process error - ", err
_logger.error(msg)
-def get_eventnamelist ():
+
+def get_eventnamelist():
"""
Function to fetch active monitored eventname list
"""
- cur = sql_executor("SELECT event_name FROM vnf_table_1",)
+ cur = sql_executor(
+ "SELECT event_name FROM vnf_table_1",
+ )
eventname_list = [item[0] for item in cur.fetchall()]
msg = "\n\nHBT:eventnameList values ", eventname_list
_logger.info(msg)
return eventname_list
-def check_and_create_vnf2_table ():
+
+def check_and_create_vnf2_table():
"""
Check and create vnf_table_2 used for tracking HB entries
"""
@@ -215,7 +222,7 @@ def check_and_create_vnf2_table ():
if database_names is not None:
if "vnf_table_2" in database_names:
table_exist = True
-
+
if table_exist == False:
msg = "HBT:Creating vnf_table_2"
_logger.info(msg)
@@ -228,8 +235,8 @@ def check_and_create_vnf2_table ():
LAST_EPO_TIME BIGINT,
SOURCE_NAME varchar,
CL_FLAG integer
- )"""
- ,)
+ )""",
+ )
else:
msg = "HBT:vnf_table_2 is already there"
_logger.info(msg)
@@ -238,15 +245,14 @@ def check_and_create_vnf2_table ():
msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
-
+
+
def check_process_reconfiguration(username, password, host, port, database_name):
"""
Verify if DB configuration in progress
"""
while True:
- hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(
- username, password, host, port, database_name
- )
+ hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(username, password, host, port, database_name)
if hbc_state == "RECONFIGURATION":
_logger.info("HBT: RECONFIGURATION in-progress. Waiting for hb_common state to become RUNNING")
time.sleep(10)
@@ -255,12 +261,13 @@ def check_process_reconfiguration(username, password, host, port, database_name)
break
return False
+
def postgres_db_open():
"""
- Wrapper function for returning DB connection object
+ Wrapper function for returning DB connection object
"""
-
- global configjsonfile
+
+ global configjsonfile
(
ip_address,
@@ -275,22 +282,22 @@ def postgres_db_open():
return connection
-def sql_executor (query, *values):
+def sql_executor(query, *values):
"""
wrapper method for DB operation
- """
- _logger.info("HBT query: %s values: %s", query, values)
-
+ """
+ _logger.info("HBT query: %s values: %s", query, values)
+
connection_db = postgres_db_open()
cur = connection_db.cursor()
- cur.execute(query, values)
+ cur.execute(query, values)
update_commands = ["CREATE", "INSERT", "UPDATE"]
- if any (x in query for x in update_commands):
- try:
- connection_db.commit() # <--- makes sure the change is shown in the database
- except psycopg2.DatabaseError as e:
- msg = "COMMON:Error %s" % e
- _logger.error(msg)
+ if any(x in query for x in update_commands):
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ except psycopg2.DatabaseError as e:
+ msg = "COMMON:Error %s" % e
+ _logger.error(msg)
return cur
@@ -310,7 +317,6 @@ if __name__ == "__main__":
configjsonfile = sys.argv[1]
msg = "HBT:HeartBeat thread Created"
_logger.info("HBT:HeartBeat thread Created")
- msg = "HBT:The config file name passed is -%s", configjsonfile
+ msg = "HBT:The config file name passed is -%s", configjsonfile
_logger.info(msg)
process_msg(configjsonfile)
-