From 341b5bb2347c30344662675936b90b325efe5520 Mon Sep 17 00:00:00 2001 From: Vijay Venkatesh Kumar Date: Thu, 17 Nov 2022 14:51:38 -0500 Subject: Heartbeat code refactoring code optimization & test improvement Issue-ID: DCAEGEN2-2953 Signed-off-by: Vijay Venkatesh Kumar Change-Id: I99229d966c13ad666ac994ab5a582aeeaa306639 Signed-off-by: Vijay Venkatesh Kumar --- miss_htbt_service/htbtworker.py | 384 +++++++++++++++++++++------------------- 1 file changed, 203 insertions(+), 181 deletions(-) (limited to 'miss_htbt_service/htbtworker.py') diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 6765266..3328973 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # Copyright (c) 2020 Deutsche Telekom. All rights reserved. # Copyright (c) 2021 Fujitsu Ltd. @@ -17,11 +17,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# Author Prakash Hosangady(ph553f@att.com) -# Simple Microservice -# Tracks Heartbeat messages on input topic in DMaaP -# and poppulate the information in postgres DB + import logging import psycopg2 @@ -31,55 +27,45 @@ import os.path as path import json import sys import time + import misshtbtd as db import get_logger _logger = logging.getLogger(__name__) +configjsonfile = "../etc/config.json" -def read_json_file(i, prefix="../../tests"): - if i == 0: - with open(path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile: - cfg = json.load(outfile) - elif i == 1: - with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: - cfg = json.load(outfile) - elif i == 2: - with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile: - cfg = json.load(outfile) - return cfg - - -def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): +def process_msg(cfgjsonfile, number_of_iterations=-1): + """ + Function to poll events from MR continuously + and determine if matching configuration to be + tracked for missed HB function + """ + global mr_url - i = 0 + global configjsonfile + configjsonfile = cfgjsonfile sleep_duration = 20 - while True: + reconfig_Flag = False + while number_of_iterations != 0: + number_of_iterations -= 1 time.sleep(sleep_duration) - with open(jsfile, "r") as outfile: + print ("*** CFG json file info " + configjsonfile) + with open(configjsonfile, "r") as outfile: cfg = json.load(outfile) + + print ("*** CFG info " + str(cfg)) mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"]) + + username = str(cfg["pg_userName"]) + password = str(cfg["pg_passwd"]) + db_host = str(cfg["pg_ipAddress"]) + db_port = cfg["pg_portNum"] + database_name = str(cfg["pg_dbName"]) + + reconfig_Flag = check_process_reconfiguration (username, password, db_host, db_port, database_name) + eventname_list = get_eventnamelist () - while True: - hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( - user_name, password, ip_address, port_num, db_name - ) - if hbc_state == "RECONFIGURATION": - _logger.info("HBT:Waiting for hb_common state to become RUNNING") - time.sleep(10) - else: - break - - if os.getenv("pytest", "") == "test": - eventname_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] - connection_db = 0 - else: - connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) - cur = connection_db.cursor() - cur.execute("SELECT event_name FROM vnf_table_1") - eventname_list = [item[0] for item in cur.fetchall()] - msg = "\n\nHBT:eventnameList values ", eventname_list - _logger.info(msg) if "groupID" not in os.environ or "consumerID" not in os.environ: get_url = mr_url + "/DefaultGroup/1?timeout=15000" else: @@ -87,105 +73,60 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): msg = "HBT:Getting :" + get_url _logger.info(msg) - if os.getenv("pytest", "") == "test": - jsonobj = read_json_file(i) - jobj = [] - jobj.append(jsonobj) - i = i + 1 - msg = "HBT:newly received test message", jobj - _logger.info(msg) - if i >= 3: - i = 0 - break - else: + try: + res = requests.get(get_url) + except Exception as e: + # message-router may be down temporarily. continue polling loop to try again + _logger.error("HBT: Failed to fetch messages from DMaaP. get_url=%s", get_url, exc_info=e) + time.sleep(1) + continue + _logger.info("HBT: %s", res.text) + input_string = res.text + # If mrstatus in message body indicates some information, not json msg. + if "mrstatus" in input_string: + continue + jlist = input_string.split("\n") + print (jlist) + # Process the DMaaP input message retrieved + error = False + jobj = [] + for line in jlist: try: - res = requests.get(get_url) - except Exception as e: - # message-router may be down temporarily. continue polling loop to try again - _logger.error("HBT: Failed to fetch messages from DMaaP. get_url=%s", get_url, exc_info=e) - time.sleep(1) - continue - _logger.info("HBT: %s", res.text) - input_string = res.text - # If mrstatus in message body indicates some information, not json msg. - if "mrstatus" in input_string: - continue - jlist = input_string.split("\n") - # Process the DMaaP input message retreived - error = False - for line in jlist: - try: - jobj = json.loads(line) - except ValueError: - msg = "HBT:Decoding JSON has failed" - _logger.error(msg) - error = True - break - if error: - continue - if len(jobj) == 0: - continue - for item in jobj: - try: - if os.getenv("pytest", "") == "test": - jitem = jsonobj - else: - jitem = json.loads(item) - srcname = jitem["event"]["commonEventHeader"]["sourceName"] - lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] - # if lastEpochMicrosec looks like microsec, align it with millisec - if lastepo > 1000000000000000: - lastepo = int(lastepo / 1000) - seqnum = jitem["event"]["commonEventHeader"]["sequence"] - event_name = jitem["event"]["commonEventHeader"]["eventName"] - except Exception as err: - msg = "HBT message process error - ", err + jobj = json.loads(line) + except ValueError: + msg = "HBT:Decoding JSON has failed" _logger.error(msg) - continue + error = True + break + if error: + continue + if len(jobj) == 0: + continue + + _logger.info("HBT jobj Array : %s", jobj) + for item in jobj: + srcname, lastepo, seqnum, event_name = parse_event(item) msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname _logger.info(msg) - if db_table_creation_check(connection_db, "vnf_table_2") is False: - msg = "HBT:Creating vnf_table_2" - _logger.info(msg) - cur.execute( - """ - CREATE TABLE vnf_table_2 ( - EVENT_NAME varchar, - SOURCE_NAME_KEY integer, - PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY), - LAST_EPO_TIME BIGINT, - SOURCE_NAME varchar, - CL_FLAG integer - )""" - ) - else: - msg = "HBT:vnf_table_2 is already there" - _logger.info(msg) + + check_and_create_vnf2_table () + if event_name in eventname_list: # pragma: no cover - if os.getenv("pytest", "") == "test": - break - cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (event_name,)) + cur = sql_executor("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", event_name) row = cur.fetchone() source_name_count = row[0] source_name_key = source_name_count + 1 cl_flag = 0 if source_name_count == 0: # pragma: no cover _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute( - "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (event_name, source_name_key, lastepo, srcname, cl_flag), - ) - cur.execute( - "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", - (source_name_key, event_name), - ) - else: # pragma: no cover + new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag) + else: # pragma: no cover msg = "HBT:event name, source_name & source_name_count are", event_name, srcname, source_name_count _logger.info(msg) for source_name_key in range(source_name_count): - cur.execute( + cur = sql_executor( "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s", - (event_name, (source_name_key + 1)), + event_name, (source_name_key + 1) ) row = cur.fetchall() if len(row) == 0: @@ -194,10 +135,10 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if db_srcname == srcname: msg = "HBT: Update vnf_table_2 : ", source_name_key, row _logger.info(msg) - cur.execute( + sql_executor( "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", - (lastepo, srcname, event_name, (source_name_key + 1)), + lastepo, srcname, event_name, (source_name_key + 1) ) source_name_key = source_name_count break @@ -208,63 +149,152 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if source_name_count == (source_name_key + 1): source_name_key = source_name_count + 1 _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute( - "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (event_name, source_name_key, lastepo, srcname, cl_flag), - ) - cur.execute( - "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", - (source_name_key, event_name), - ) + new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag) else: - _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") - commit_db(connection_db) - commit_and_close_db(connection_db) - if os.getenv("pytest", "") != "test": - cur.close() + _logger.info("HBT:eventName is not being monitored, Ignoring JSON message") + msg = "HBT: Looping to check for new events from DMAAP" + print ("HBT: Looping to check for new events from DMAAP") + _logger.info(msg) +def new_vnf_entry (event_name, source_name_key, lastepo, srcname, cl_flag): + """ + Wrapper function to update event to tracking tables + """ + + sql_executor( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + event_name, source_name_key, lastepo, srcname, cl_flag + ) + sql_executor( + "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", + source_name_key, event_name + ) -def postgres_db_open(username, password, host, port, database_name): - if os.getenv("pytest", "") == "test": - return True - connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port) - return connection +def parse_event (jsonstring): + """ + Function to parse incoming event as json object + and parse out required attributes + """ + _logger.info("HBT jsonstring: %s", jsonstring) + #convert string to object + jitem = json.loads(jsonstring) + + try: + srcname = jitem["event"]["commonEventHeader"]["sourceName"] + lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] + # if lastEpochMicrosec looks like microsec, align it with millisec + if lastepo > 1000000000000000: + lastepo = int(lastepo / 1000) + seqnum = jitem["event"]["commonEventHeader"]["sequence"] + event_name = jitem["event"]["commonEventHeader"]["eventName"] + msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname + _logger.info(msg) + return (srcname,lastepo,seqnum,event_name) + except Exception as err: + msg = "HBT message process error - ", err + _logger.error(msg) +def get_eventnamelist (): + """ + Function to fetch active monitored eventname list + """ + cur = sql_executor("SELECT event_name FROM vnf_table_1",) + eventname_list = [item[0] for item in cur.fetchall()] + msg = "\n\nHBT:eventnameList values ", eventname_list + _logger.info(msg) + return eventname_list -def db_table_creation_check(connection_db, table_name): - if os.getenv("pytest", "") == "test": - return True +def check_and_create_vnf2_table (): + """ + Check and create vnf_table_2 used for tracking HB entries + """ try: - cur = connection_db.cursor() - cur.execute("SELECT * FROM information_schema.tables WHERE table_name = %s", (table_name,)) + table_exist = False + cur = sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2") database_names = cur.fetchone() if database_names is not None: - if table_name in database_names: - return True + if "vnf_table_2" in database_names: + table_exist = True + + if table_exist == False: + msg = "HBT:Creating vnf_table_2" + _logger.info(msg) + sql_executor( + """ + CREATE TABLE vnf_table_2 ( + EVENT_NAME varchar, + SOURCE_NAME_KEY integer, + PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY), + LAST_EPO_TIME BIGINT, + SOURCE_NAME varchar, + CL_FLAG integer + )""" + ,) else: - return False - except psycopg2.DatabaseError as e: - msg = "COMMON:Error %s" % e - _logger.error(msg) - finally: - cur.close() - - -def commit_db(connection_db): - if os.getenv("pytest", "") == "test": - return True - try: - connection_db.commit() # <--- makes sure the change is shown in the database + msg = "HBT:vnf_table_2 is already there" + _logger.info(msg) return True except psycopg2.DatabaseError as e: msg = "COMMON:Error %s" % e _logger.error(msg) return False + +def check_process_reconfiguration(username, password, host, port, database_name): + """ + Verify if DB configuration in progress + """ + while True: + hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( + username, password, host, port, database_name + ) + if hbc_state == "RECONFIGURATION": + _logger.info("HBT: RECONFIGURATION in-progress. Waiting for hb_common state to become RUNNING") + time.sleep(10) + else: + _logger.info("HBT: hb_common state is RUNNING") + break + return False + +def postgres_db_open(): + """ + Wrapper function for returning DB connection object + """ + + global configjsonfile + + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(configjsonfile) + connection = psycopg2.connect(database=db_name, user=user_name, password=password, host=ip_address, port=port_num) + return connection + + +def sql_executor (query, *values): + """ + wrapper method for DB operation + """ + _logger.info("HBT query: %s values: %s", query, values) + + connection_db = postgres_db_open() + cur = connection_db.cursor() + cur.execute(query, values) + update_commands = ["CREATE", "INSERT", "UPDATE"] + if any (x in query for x in update_commands): + try: + connection_db.commit() # <--- makes sure the change is shown in the database + except psycopg2.DatabaseError as e: + msg = "COMMON:Error %s" % e + _logger.error(msg) + return cur def commit_and_close_db(connection_db): - if os.getenv("pytest", "") == "test": - return True try: connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() @@ -277,18 +307,10 @@ def commit_and_close_db(connection_db): if __name__ == "__main__": get_logger.configure_logger("htbtworker") - jsfile = sys.argv[1] + configjsonfile = sys.argv[1] msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") - msg = "HBT:The config file name passed is -%s", jsfile + msg = "HBT:The config file name passed is -%s", configjsonfile _logger.info(msg) - ( - ip_address, - port_num, - user_name, - password, - db_name, - cbs_polling_required, - cbs_polling_interval, - ) = db.read_hb_properties(jsfile) - process_msg(jsfile, user_name, password, ip_address, port_num, db_name) + process_msg(configjsonfile) + -- cgit 1.2.3-korg