aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/htbtworker.py
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2022-11-17 14:51:38 -0500
committerVijay Venkatesh Kumar <vv770d@att.com>2023-01-05 17:10:16 -0500
commit341b5bb2347c30344662675936b90b325efe5520 (patch)
treec5fa256f77cae915bd758a060b69e53c4039e7c1 /miss_htbt_service/htbtworker.py
parent2e840627a6b01475eb98b52f0a45593b4f2b8641 (diff)
Heartbeat code refactoring
code optimization & test improvement Issue-ID: DCAEGEN2-2953 Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com> Change-Id: I99229d966c13ad666ac994ab5a582aeeaa306639 Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r--miss_htbt_service/htbtworker.py384
1 files changed, 203 insertions, 181 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index 6765266..3328973 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# Copyright (c) 2021 Fujitsu Ltd.
@@ -17,11 +17,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# Author Prakash Hosangady(ph553f@att.com)
-# Simple Microservice
-# Tracks Heartbeat messages on input topic in DMaaP
-# and poppulate the information in postgres DB
+
import logging
import psycopg2
@@ -31,55 +27,45 @@ import os.path as path
import json
import sys
import time
+
import misshtbtd as db
import get_logger
_logger = logging.getLogger(__name__)
+configjsonfile = "../etc/config.json"
-def read_json_file(i, prefix="../../tests"):
- if i == 0:
- with open(path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile:
- cfg = json.load(outfile)
- elif i == 1:
- with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile:
- cfg = json.load(outfile)
- elif i == 2:
- with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile:
- cfg = json.load(outfile)
- return cfg
-
-
-def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
+def process_msg(cfgjsonfile, number_of_iterations=-1):
+ """
+ Function to poll events from MR continuously
+ and determine if matching configuration to be
+ tracked for missed HB function
+ """
+
global mr_url
- i = 0
+ global configjsonfile
+ configjsonfile = cfgjsonfile
sleep_duration = 20
- while True:
+ reconfig_Flag = False
+ while number_of_iterations != 0:
+ number_of_iterations -= 1
time.sleep(sleep_duration)
- with open(jsfile, "r") as outfile:
+ print ("*** CFG json file info " + configjsonfile)
+ with open(configjsonfile, "r") as outfile:
cfg = json.load(outfile)
+
+ print ("*** CFG info " + str(cfg))
mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"])
+
+ username = str(cfg["pg_userName"])
+ password = str(cfg["pg_passwd"])
+ db_host = str(cfg["pg_ipAddress"])
+ db_port = cfg["pg_portNum"]
+ database_name = str(cfg["pg_dbName"])
+
+ reconfig_Flag = check_process_reconfiguration (username, password, db_host, db_port, database_name)
+ eventname_list = get_eventnamelist ()
- while True:
- hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(
- user_name, password, ip_address, port_num, db_name
- )
- if hbc_state == "RECONFIGURATION":
- _logger.info("HBT:Waiting for hb_common state to become RUNNING")
- time.sleep(10)
- else:
- break
-
- if os.getenv("pytest", "") == "test":
- eventname_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
- connection_db = 0
- else:
- connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
- cur = connection_db.cursor()
- cur.execute("SELECT event_name FROM vnf_table_1")
- eventname_list = [item[0] for item in cur.fetchall()]
- msg = "\n\nHBT:eventnameList values ", eventname_list
- _logger.info(msg)
if "groupID" not in os.environ or "consumerID" not in os.environ:
get_url = mr_url + "/DefaultGroup/1?timeout=15000"
else:
@@ -87,105 +73,60 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
msg = "HBT:Getting :" + get_url
_logger.info(msg)
- if os.getenv("pytest", "") == "test":
- jsonobj = read_json_file(i)
- jobj = []
- jobj.append(jsonobj)
- i = i + 1
- msg = "HBT:newly received test message", jobj
- _logger.info(msg)
- if i >= 3:
- i = 0
- break
- else:
+ try:
+ res = requests.get(get_url)
+ except Exception as e:
+ # message-router may be down temporarily. continue polling loop to try again
+ _logger.error("HBT: Failed to fetch messages from DMaaP. get_url=%s", get_url, exc_info=e)
+ time.sleep(1)
+ continue
+ _logger.info("HBT: %s", res.text)
+ input_string = res.text
+ # If mrstatus in message body indicates some information, not json msg.
+ if "mrstatus" in input_string:
+ continue
+ jlist = input_string.split("\n")
+ print (jlist)
+ # Process the DMaaP input message retrieved
+ error = False
+ jobj = []
+ for line in jlist:
try:
- res = requests.get(get_url)
- except Exception as e:
- # message-router may be down temporarily. continue polling loop to try again
- _logger.error("HBT: Failed to fetch messages from DMaaP. get_url=%s", get_url, exc_info=e)
- time.sleep(1)
- continue
- _logger.info("HBT: %s", res.text)
- input_string = res.text
- # If mrstatus in message body indicates some information, not json msg.
- if "mrstatus" in input_string:
- continue
- jlist = input_string.split("\n")
- # Process the DMaaP input message retreived
- error = False
- for line in jlist:
- try:
- jobj = json.loads(line)
- except ValueError:
- msg = "HBT:Decoding JSON has failed"
- _logger.error(msg)
- error = True
- break
- if error:
- continue
- if len(jobj) == 0:
- continue
- for item in jobj:
- try:
- if os.getenv("pytest", "") == "test":
- jitem = jsonobj
- else:
- jitem = json.loads(item)
- srcname = jitem["event"]["commonEventHeader"]["sourceName"]
- lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
- # if lastEpochMicrosec looks like microsec, align it with millisec
- if lastepo > 1000000000000000:
- lastepo = int(lastepo / 1000)
- seqnum = jitem["event"]["commonEventHeader"]["sequence"]
- event_name = jitem["event"]["commonEventHeader"]["eventName"]
- except Exception as err:
- msg = "HBT message process error - ", err
+ jobj = json.loads(line)
+ except ValueError:
+ msg = "HBT:Decoding JSON has failed"
_logger.error(msg)
- continue
+ error = True
+ break
+ if error:
+ continue
+ if len(jobj) == 0:
+ continue
+
+ _logger.info("HBT jobj Array : %s", jobj)
+ for item in jobj:
+ srcname, lastepo, seqnum, event_name = parse_event(item)
msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname
_logger.info(msg)
- if db_table_creation_check(connection_db, "vnf_table_2") is False:
- msg = "HBT:Creating vnf_table_2"
- _logger.info(msg)
- cur.execute(
- """
- CREATE TABLE vnf_table_2 (
- EVENT_NAME varchar,
- SOURCE_NAME_KEY integer,
- PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY),
- LAST_EPO_TIME BIGINT,
- SOURCE_NAME varchar,
- CL_FLAG integer
- )"""
- )
- else:
- msg = "HBT:vnf_table_2 is already there"
- _logger.info(msg)
+
+ check_and_create_vnf2_table ()
+
if event_name in eventname_list: # pragma: no cover
- if os.getenv("pytest", "") == "test":
- break
- cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (event_name,))
+ cur = sql_executor("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", event_name)
row = cur.fetchone()
source_name_count = row[0]
source_name_key = source_name_count + 1
cl_flag = 0
if source_name_count == 0: # pragma: no cover
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute(
- "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (event_name, source_name_key, lastepo, srcname, cl_flag),
- )
- cur.execute(
- "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s",
- (source_name_key, event_name),
- )
- else: # pragma: no cover
+ new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag)
+ else: # pragma: no cover
msg = "HBT:event name, source_name & source_name_count are", event_name, srcname, source_name_count
_logger.info(msg)
for source_name_key in range(source_name_count):
- cur.execute(
+ cur = sql_executor(
"SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s",
- (event_name, (source_name_key + 1)),
+ event_name, (source_name_key + 1)
)
row = cur.fetchall()
if len(row) == 0:
@@ -194,10 +135,10 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if db_srcname == srcname:
msg = "HBT: Update vnf_table_2 : ", source_name_key, row
_logger.info(msg)
- cur.execute(
+ sql_executor(
"UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
"WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
- (lastepo, srcname, event_name, (source_name_key + 1)),
+ lastepo, srcname, event_name, (source_name_key + 1)
)
source_name_key = source_name_count
break
@@ -208,63 +149,152 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if source_name_count == (source_name_key + 1):
source_name_key = source_name_count + 1
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute(
- "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (event_name, source_name_key, lastepo, srcname, cl_flag),
- )
- cur.execute(
- "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
- (source_name_key, event_name),
- )
+ new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag)
else:
- _logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
- commit_db(connection_db)
- commit_and_close_db(connection_db)
- if os.getenv("pytest", "") != "test":
- cur.close()
+ _logger.info("HBT:eventName is not being monitored, Ignoring JSON message")
+ msg = "HBT: Looping to check for new events from DMAAP"
+ print ("HBT: Looping to check for new events from DMAAP")
+ _logger.info(msg)
+def new_vnf_entry (event_name, source_name_key, lastepo, srcname, cl_flag):
+ """
+ Wrapper function to update event to tracking tables
+ """
+
+ sql_executor(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
+ event_name, source_name_key, lastepo, srcname, cl_flag
+ )
+ sql_executor(
+ "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
+ source_name_key, event_name
+ )
-def postgres_db_open(username, password, host, port, database_name):
- if os.getenv("pytest", "") == "test":
- return True
- connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port)
- return connection
+def parse_event (jsonstring):
+ """
+ Function to parse incoming event as json object
+ and parse out required attributes
+ """
+ _logger.info("HBT jsonstring: %s", jsonstring)
+ #convert string to object
+ jitem = json.loads(jsonstring)
+
+ try:
+ srcname = jitem["event"]["commonEventHeader"]["sourceName"]
+ lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
+ # if lastEpochMicrosec looks like microsec, align it with millisec
+ if lastepo > 1000000000000000:
+ lastepo = int(lastepo / 1000)
+ seqnum = jitem["event"]["commonEventHeader"]["sequence"]
+ event_name = jitem["event"]["commonEventHeader"]["eventName"]
+ msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname
+ _logger.info(msg)
+ return (srcname,lastepo,seqnum,event_name)
+ except Exception as err:
+ msg = "HBT message process error - ", err
+ _logger.error(msg)
+def get_eventnamelist ():
+ """
+ Function to fetch active monitored eventname list
+ """
+ cur = sql_executor("SELECT event_name FROM vnf_table_1",)
+ eventname_list = [item[0] for item in cur.fetchall()]
+ msg = "\n\nHBT:eventnameList values ", eventname_list
+ _logger.info(msg)
+ return eventname_list
-def db_table_creation_check(connection_db, table_name):
- if os.getenv("pytest", "") == "test":
- return True
+def check_and_create_vnf2_table ():
+ """
+ Check and create vnf_table_2 used for tracking HB entries
+ """
try:
- cur = connection_db.cursor()
- cur.execute("SELECT * FROM information_schema.tables WHERE table_name = %s", (table_name,))
+ table_exist = False
+ cur = sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2")
database_names = cur.fetchone()
if database_names is not None:
- if table_name in database_names:
- return True
+ if "vnf_table_2" in database_names:
+ table_exist = True
+
+ if table_exist == False:
+ msg = "HBT:Creating vnf_table_2"
+ _logger.info(msg)
+ sql_executor(
+ """
+ CREATE TABLE vnf_table_2 (
+ EVENT_NAME varchar,
+ SOURCE_NAME_KEY integer,
+ PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY),
+ LAST_EPO_TIME BIGINT,
+ SOURCE_NAME varchar,
+ CL_FLAG integer
+ )"""
+ ,)
else:
- return False
- except psycopg2.DatabaseError as e:
- msg = "COMMON:Error %s" % e
- _logger.error(msg)
- finally:
- cur.close()
-
-
-def commit_db(connection_db):
- if os.getenv("pytest", "") == "test":
- return True
- try:
- connection_db.commit() # <--- makes sure the change is shown in the database
+ msg = "HBT:vnf_table_2 is already there"
+ _logger.info(msg)
return True
except psycopg2.DatabaseError as e:
msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
+
+def check_process_reconfiguration(username, password, host, port, database_name):
+ """
+ Verify if DB configuration in progress
+ """
+ while True:
+ hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(
+ username, password, host, port, database_name
+ )
+ if hbc_state == "RECONFIGURATION":
+ _logger.info("HBT: RECONFIGURATION in-progress. Waiting for hb_common state to become RUNNING")
+ time.sleep(10)
+ else:
+ _logger.info("HBT: hb_common state is RUNNING")
+ break
+ return False
+
+def postgres_db_open():
+ """
+ Wrapper function for returning DB connection object
+ """
+
+ global configjsonfile
+
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(configjsonfile)
+ connection = psycopg2.connect(database=db_name, user=user_name, password=password, host=ip_address, port=port_num)
+ return connection
+
+
+def sql_executor (query, *values):
+ """
+ wrapper method for DB operation
+ """
+ _logger.info("HBT query: %s values: %s", query, values)
+
+ connection_db = postgres_db_open()
+ cur = connection_db.cursor()
+ cur.execute(query, values)
+ update_commands = ["CREATE", "INSERT", "UPDATE"]
+ if any (x in query for x in update_commands):
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ except psycopg2.DatabaseError as e:
+ msg = "COMMON:Error %s" % e
+ _logger.error(msg)
+ return cur
def commit_and_close_db(connection_db):
- if os.getenv("pytest", "") == "test":
- return True
try:
connection_db.commit() # <--- makes sure the change is shown in the database
connection_db.close()
@@ -277,18 +307,10 @@ def commit_and_close_db(connection_db):
if __name__ == "__main__":
get_logger.configure_logger("htbtworker")
- jsfile = sys.argv[1]
+ configjsonfile = sys.argv[1]
msg = "HBT:HeartBeat thread Created"
_logger.info("HBT:HeartBeat thread Created")
- msg = "HBT:The config file name passed is -%s", jsfile
+ msg = "HBT:The config file name passed is -%s", configjsonfile
_logger.info(msg)
- (
- ip_address,
- port_num,
- user_name,
- password,
- db_name,
- cbs_polling_required,
- cbs_polling_interval,
- ) = db.read_hb_properties(jsfile)
- process_msg(jsfile, user_name, password, ip_address, port_num, db_name)
+ process_msg(configjsonfile)
+