diff options
author | Vijay Venkatesh Kumar <vv770d@att.com> | 2022-11-17 14:51:38 -0500 |
---|---|---|
committer | Vijay Venkatesh Kumar <vv770d@att.com> | 2023-01-05 17:10:16 -0500 |
commit | 341b5bb2347c30344662675936b90b325efe5520 (patch) | |
tree | c5fa256f77cae915bd758a060b69e53c4039e7c1 /miss_htbt_service | |
parent | 2e840627a6b01475eb98b52f0a45593b4f2b8641 (diff) |
Heartbeat code refactoring
code optimization & test improvement
Issue-ID: DCAEGEN2-2953
Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Change-Id: I99229d966c13ad666ac994ab5a582aeeaa306639
Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Diffstat (limited to 'miss_htbt_service')
-rw-r--r-- | miss_htbt_service/config.json | 1 | ||||
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 43 | ||||
-rw-r--r-- | miss_htbt_service/htbtworker.py | 384 | ||||
-rw-r--r-- | miss_htbt_service/misshtbtd.py | 83 | ||||
-rw-r--r-- | miss_htbt_service/mod/htbt_exit.py (renamed from miss_htbt_service/mod/trapd_exit.py) | 6 | ||||
-rw-r--r-- | miss_htbt_service/mod/htbt_get_cbs_config.py (renamed from miss_htbt_service/mod/trapd_get_cbs_config.py) | 10 | ||||
-rw-r--r-- | miss_htbt_service/mod/htbt_http_session.py (renamed from miss_htbt_service/mod/trapd_http_session.py) | 4 | ||||
-rw-r--r-- | miss_htbt_service/mod/htbt_io.py (renamed from miss_htbt_service/mod/trapd_io.py) | 2 | ||||
-rw-r--r-- | miss_htbt_service/mod/htbt_runtime_pid.py (renamed from miss_htbt_service/mod/trapd_runtime_pid.py) | 4 | ||||
-rw-r--r-- | miss_htbt_service/mod/htbt_settings.py (renamed from miss_htbt_service/mod/trapd_settings.py) | 2 | ||||
-rw-r--r-- | miss_htbt_service/mod/htbt_vnf_table.py (renamed from miss_htbt_service/mod/trapd_vnf_table.py) | 18 |
11 files changed, 283 insertions, 274 deletions
diff --git a/miss_htbt_service/config.json b/miss_htbt_service/config.json new file mode 100644 index 0000000..e853f6a --- /dev/null +++ b/miss_htbt_service/config.json @@ -0,0 +1 @@ +{"pg_ipAddress": "10.0.4.1", "pg_userName": "postgres", "pg_dbName": "postgres", "streams_subscribes": {"ves-heartbeat": {"type": "message_router", "dmaap_info": {"topic_url": "http://10.12.5.252:3904/events/unauthenticated.SEC_HEARTBEAT_INPUT"}}}, "consumerID": "1", "CBS_polling_interval": "300", "pg_passwd": "postgres", "streams_publishes": {"dcae_cl_out": {"type": "message_router", "dmaap_info": {"topic_url": "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT"}}}, "pg_portNum": "5432", "CBS_polling_allowed": "True", "groupID": "groupID", "heartbeat_config": "{\"vnfs\": [{\"eventName\": \"Heartbeat_vDNS\",\"heartbeatcountmissed\": 3,\"heartbeatinterval\": 60,\"closedLoopControlName\": \"ControlLoopEvent1\",\t\"policyVersion\": \"1.0.0.5\",\t\"policyName\":\"vFireWall\",\"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\",\"target_type\": \"VNF\",\t\"target\": \"genVnfName\",\t\"version\": \"1.0\"}, {\"eventName\": \"Heartbeat_vFW\",\"heartbeatcountmissed\": 3,\t\"heartbeatinterval\": 60,\"closedLoopControlName\": \"ControlLoopEvent1\",\"policyVersion\": \"1.0.0.5\",\"policyName\": \"vFireWall\",\"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\",\t\"target_type\":\"VNF\",\t\"target\": \"genVnfName\",\t\"version\": \"1.0\"}, {\"eventName\": \"Heartbeat_xx\",\"heartbeatcountmissed\": 3,\t\"heartbeatinterval\": 60,\"closedLoopControlName\": \"ControlLoopEvent1\",\"policyVersion\": \"1.0.0.5\",\"policyName\": \"vFireWall\",\t\"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\",\"target_type\": \"VNF\",\"target\": \"genVnfName\",\"version\": \"1.0\"}]}"} diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index eac8a91..cbd8f24 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # Copyright (c) 2020 Deutsche Telekom. All rights reserved. # Copyright (c) 2021 Fujitsu Ltd. @@ -30,6 +30,7 @@ import os import socket import time import requests +import psycopg2 import htbtworker as pm import misshtbtd as db import get_logger @@ -159,10 +160,6 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ while True: time.sleep(20) - env_pytest = os.getenv("pytest", "") - if env_pytest == "test": - break - try: with open(json_file, "r") as outfile: cfg = json.load(outfile) @@ -177,9 +174,9 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ ) source_name = socket.gethostname() source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) - connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) + connection_db = pm.postgres_db_open() cur = connection_db.cursor() - if int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING": + if int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING": # pragma: no cover _logger.info("DBM: Active DB Monitoring Instance") cur.execute("SELECT event_name FROM vnf_table_1") vnf_list = [item[0] for item in cur.fetchall()] @@ -224,7 +221,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ epoc_time_sec = row[0][0] src_name = row[0][1] cl_flag = row[0][2] - if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: + if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover sendControlLoopEvent( "ONSET", pol_url, @@ -244,7 +241,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ (cl_flag, event_name, (source_name_key + 1)), ) connection_db.commit() - elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: + elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover sendControlLoopEvent( "ABATED", pol_url, @@ -277,17 +274,21 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ """ else: # pragma: no cover msg = "DBM:Inactive instance or hb_common state is not RUNNING" - _logger.info(msg) - pm.commit_and_close_db(connection_db) + _logger.info(msg) + try: + connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.close() + return True + except psycopg2.DatabaseError as e: + msg = "COMMON:Error %s" % e + _logger.error(msg) + return False cur.close() break - -if __name__ == "__main__": +def db_monitoring_wrapper (current_pid, jsfile, number_of_iterations=-1): get_logger.configure_logger("db_monitoring") _logger.info("DBM: DBM Process started") - current_pid = sys.argv[1] - jsfile = sys.argv[2] ( ip_address, port_num, @@ -299,8 +300,12 @@ if __name__ == "__main__": ) = db.read_hb_properties(jsfile) msg = "DBM:Parent process ID and json file name", current_pid, jsfile _logger.info(msg) - while True: + while number_of_iterations != 0: + number_of_iterations -= 1 db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name) - env_pytest = os.getenv("pytest", "") - if env_pytest == "test": - break + + +if __name__ == "__main__": + current_pid = sys.argv[1] + jsfile = sys.argv[2] + db_monitoring_wrapper (current_pid,jsfile) diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 6765266..3328973 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # Copyright (c) 2020 Deutsche Telekom. All rights reserved. # Copyright (c) 2021 Fujitsu Ltd. @@ -17,11 +17,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# Author Prakash Hosangady(ph553f@att.com) -# Simple Microservice -# Tracks Heartbeat messages on input topic in DMaaP -# and poppulate the information in postgres DB + import logging import psycopg2 @@ -31,55 +27,45 @@ import os.path as path import json import sys import time + import misshtbtd as db import get_logger _logger = logging.getLogger(__name__) +configjsonfile = "../etc/config.json" -def read_json_file(i, prefix="../../tests"): - if i == 0: - with open(path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile: - cfg = json.load(outfile) - elif i == 1: - with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: - cfg = json.load(outfile) - elif i == 2: - with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile: - cfg = json.load(outfile) - return cfg - - -def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): +def process_msg(cfgjsonfile, number_of_iterations=-1): + """ + Function to poll events from MR continuously + and determine if matching configuration to be + tracked for missed HB function + """ + global mr_url - i = 0 + global configjsonfile + configjsonfile = cfgjsonfile sleep_duration = 20 - while True: + reconfig_Flag = False + while number_of_iterations != 0: + number_of_iterations -= 1 time.sleep(sleep_duration) - with open(jsfile, "r") as outfile: + print ("*** CFG json file info " + configjsonfile) + with open(configjsonfile, "r") as outfile: cfg = json.load(outfile) + + print ("*** CFG info " + str(cfg)) mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"]) + + username = str(cfg["pg_userName"]) + password = str(cfg["pg_passwd"]) + db_host = str(cfg["pg_ipAddress"]) + db_port = cfg["pg_portNum"] + database_name = str(cfg["pg_dbName"]) + + reconfig_Flag = check_process_reconfiguration (username, password, db_host, db_port, database_name) + eventname_list = get_eventnamelist () - while True: - hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( - user_name, password, ip_address, port_num, db_name - ) - if hbc_state == "RECONFIGURATION": - _logger.info("HBT:Waiting for hb_common state to become RUNNING") - time.sleep(10) - else: - break - - if os.getenv("pytest", "") == "test": - eventname_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] - connection_db = 0 - else: - connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) - cur = connection_db.cursor() - cur.execute("SELECT event_name FROM vnf_table_1") - eventname_list = [item[0] for item in cur.fetchall()] - msg = "\n\nHBT:eventnameList values ", eventname_list - _logger.info(msg) if "groupID" not in os.environ or "consumerID" not in os.environ: get_url = mr_url + "/DefaultGroup/1?timeout=15000" else: @@ -87,105 +73,60 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): msg = "HBT:Getting :" + get_url _logger.info(msg) - if os.getenv("pytest", "") == "test": - jsonobj = read_json_file(i) - jobj = [] - jobj.append(jsonobj) - i = i + 1 - msg = "HBT:newly received test message", jobj - _logger.info(msg) - if i >= 3: - i = 0 - break - else: + try: + res = requests.get(get_url) + except Exception as e: + # message-router may be down temporarily. continue polling loop to try again + _logger.error("HBT: Failed to fetch messages from DMaaP. get_url=%s", get_url, exc_info=e) + time.sleep(1) + continue + _logger.info("HBT: %s", res.text) + input_string = res.text + # If mrstatus in message body indicates some information, not json msg. + if "mrstatus" in input_string: + continue + jlist = input_string.split("\n") + print (jlist) + # Process the DMaaP input message retrieved + error = False + jobj = [] + for line in jlist: try: - res = requests.get(get_url) - except Exception as e: - # message-router may be down temporarily. continue polling loop to try again - _logger.error("HBT: Failed to fetch messages from DMaaP. get_url=%s", get_url, exc_info=e) - time.sleep(1) - continue - _logger.info("HBT: %s", res.text) - input_string = res.text - # If mrstatus in message body indicates some information, not json msg. - if "mrstatus" in input_string: - continue - jlist = input_string.split("\n") - # Process the DMaaP input message retreived - error = False - for line in jlist: - try: - jobj = json.loads(line) - except ValueError: - msg = "HBT:Decoding JSON has failed" - _logger.error(msg) - error = True - break - if error: - continue - if len(jobj) == 0: - continue - for item in jobj: - try: - if os.getenv("pytest", "") == "test": - jitem = jsonobj - else: - jitem = json.loads(item) - srcname = jitem["event"]["commonEventHeader"]["sourceName"] - lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] - # if lastEpochMicrosec looks like microsec, align it with millisec - if lastepo > 1000000000000000: - lastepo = int(lastepo / 1000) - seqnum = jitem["event"]["commonEventHeader"]["sequence"] - event_name = jitem["event"]["commonEventHeader"]["eventName"] - except Exception as err: - msg = "HBT message process error - ", err + jobj = json.loads(line) + except ValueError: + msg = "HBT:Decoding JSON has failed" _logger.error(msg) - continue + error = True + break + if error: + continue + if len(jobj) == 0: + continue + + _logger.info("HBT jobj Array : %s", jobj) + for item in jobj: + srcname, lastepo, seqnum, event_name = parse_event(item) msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname _logger.info(msg) - if db_table_creation_check(connection_db, "vnf_table_2") is False: - msg = "HBT:Creating vnf_table_2" - _logger.info(msg) - cur.execute( - """ - CREATE TABLE vnf_table_2 ( - EVENT_NAME varchar, - SOURCE_NAME_KEY integer, - PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY), - LAST_EPO_TIME BIGINT, - SOURCE_NAME varchar, - CL_FLAG integer - )""" - ) - else: - msg = "HBT:vnf_table_2 is already there" - _logger.info(msg) + + check_and_create_vnf2_table () + if event_name in eventname_list: # pragma: no cover - if os.getenv("pytest", "") == "test": - break - cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (event_name,)) + cur = sql_executor("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", event_name) row = cur.fetchone() source_name_count = row[0] source_name_key = source_name_count + 1 cl_flag = 0 if source_name_count == 0: # pragma: no cover _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute( - "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (event_name, source_name_key, lastepo, srcname, cl_flag), - ) - cur.execute( - "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", - (source_name_key, event_name), - ) - else: # pragma: no cover + new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag) + else: # pragma: no cover msg = "HBT:event name, source_name & source_name_count are", event_name, srcname, source_name_count _logger.info(msg) for source_name_key in range(source_name_count): - cur.execute( + cur = sql_executor( "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s", - (event_name, (source_name_key + 1)), + event_name, (source_name_key + 1) ) row = cur.fetchall() if len(row) == 0: @@ -194,10 +135,10 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if db_srcname == srcname: msg = "HBT: Update vnf_table_2 : ", source_name_key, row _logger.info(msg) - cur.execute( + sql_executor( "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", - (lastepo, srcname, event_name, (source_name_key + 1)), + lastepo, srcname, event_name, (source_name_key + 1) ) source_name_key = source_name_count break @@ -208,63 +149,152 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if source_name_count == (source_name_key + 1): source_name_key = source_name_count + 1 _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute( - "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (event_name, source_name_key, lastepo, srcname, cl_flag), - ) - cur.execute( - "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", - (source_name_key, event_name), - ) + new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag) else: - _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") - commit_db(connection_db) - commit_and_close_db(connection_db) - if os.getenv("pytest", "") != "test": - cur.close() + _logger.info("HBT:eventName is not being monitored, Ignoring JSON message") + msg = "HBT: Looping to check for new events from DMAAP" + print ("HBT: Looping to check for new events from DMAAP") + _logger.info(msg) +def new_vnf_entry (event_name, source_name_key, lastepo, srcname, cl_flag): + """ + Wrapper function to update event to tracking tables + """ + + sql_executor( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + event_name, source_name_key, lastepo, srcname, cl_flag + ) + sql_executor( + "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", + source_name_key, event_name + ) -def postgres_db_open(username, password, host, port, database_name): - if os.getenv("pytest", "") == "test": - return True - connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port) - return connection +def parse_event (jsonstring): + """ + Function to parse incoming event as json object + and parse out required attributes + """ + _logger.info("HBT jsonstring: %s", jsonstring) + #convert string to object + jitem = json.loads(jsonstring) + + try: + srcname = jitem["event"]["commonEventHeader"]["sourceName"] + lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] + # if lastEpochMicrosec looks like microsec, align it with millisec + if lastepo > 1000000000000000: + lastepo = int(lastepo / 1000) + seqnum = jitem["event"]["commonEventHeader"]["sequence"] + event_name = jitem["event"]["commonEventHeader"]["eventName"] + msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname + _logger.info(msg) + return (srcname,lastepo,seqnum,event_name) + except Exception as err: + msg = "HBT message process error - ", err + _logger.error(msg) +def get_eventnamelist (): + """ + Function to fetch active monitored eventname list + """ + cur = sql_executor("SELECT event_name FROM vnf_table_1",) + eventname_list = [item[0] for item in cur.fetchall()] + msg = "\n\nHBT:eventnameList values ", eventname_list + _logger.info(msg) + return eventname_list -def db_table_creation_check(connection_db, table_name): - if os.getenv("pytest", "") == "test": - return True +def check_and_create_vnf2_table (): + """ + Check and create vnf_table_2 used for tracking HB entries + """ try: - cur = connection_db.cursor() - cur.execute("SELECT * FROM information_schema.tables WHERE table_name = %s", (table_name,)) + table_exist = False + cur = sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2") database_names = cur.fetchone() if database_names is not None: - if table_name in database_names: - return True + if "vnf_table_2" in database_names: + table_exist = True + + if table_exist == False: + msg = "HBT:Creating vnf_table_2" + _logger.info(msg) + sql_executor( + """ + CREATE TABLE vnf_table_2 ( + EVENT_NAME varchar, + SOURCE_NAME_KEY integer, + PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY), + LAST_EPO_TIME BIGINT, + SOURCE_NAME varchar, + CL_FLAG integer + )""" + ,) else: - return False - except psycopg2.DatabaseError as e: - msg = "COMMON:Error %s" % e - _logger.error(msg) - finally: - cur.close() - - -def commit_db(connection_db): - if os.getenv("pytest", "") == "test": - return True - try: - connection_db.commit() # <--- makes sure the change is shown in the database + msg = "HBT:vnf_table_2 is already there" + _logger.info(msg) return True except psycopg2.DatabaseError as e: msg = "COMMON:Error %s" % e _logger.error(msg) return False + +def check_process_reconfiguration(username, password, host, port, database_name): + """ + Verify if DB configuration in progress + """ + while True: + hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( + username, password, host, port, database_name + ) + if hbc_state == "RECONFIGURATION": + _logger.info("HBT: RECONFIGURATION in-progress. Waiting for hb_common state to become RUNNING") + time.sleep(10) + else: + _logger.info("HBT: hb_common state is RUNNING") + break + return False + +def postgres_db_open(): + """ + Wrapper function for returning DB connection object + """ + + global configjsonfile + + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(configjsonfile) + connection = psycopg2.connect(database=db_name, user=user_name, password=password, host=ip_address, port=port_num) + return connection + + +def sql_executor (query, *values): + """ + wrapper method for DB operation + """ + _logger.info("HBT query: %s values: %s", query, values) + + connection_db = postgres_db_open() + cur = connection_db.cursor() + cur.execute(query, values) + update_commands = ["CREATE", "INSERT", "UPDATE"] + if any (x in query for x in update_commands): + try: + connection_db.commit() # <--- makes sure the change is shown in the database + except psycopg2.DatabaseError as e: + msg = "COMMON:Error %s" % e + _logger.error(msg) + return cur def commit_and_close_db(connection_db): - if os.getenv("pytest", "") == "test": - return True try: connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() @@ -277,18 +307,10 @@ def commit_and_close_db(connection_db): if __name__ == "__main__": get_logger.configure_logger("htbtworker") - jsfile = sys.argv[1] + configjsonfile = sys.argv[1] msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") - msg = "HBT:The config file name passed is -%s", jsfile + msg = "HBT:The config file name passed is -%s", configjsonfile _logger.info(msg) - ( - ip_address, - port_num, - user_name, - password, - db_name, - cbs_polling_required, - cbs_polling_interval, - ) = db.read_hb_properties(jsfile) - process_msg(jsfile, user_name, password, ip_address, port_num, db_name) + process_msg(configjsonfile) + diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index fd1a09c..23d4835 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # Copyright (c) 2020 Deutsche Telekom. All rights reserved. # Copyright (c) 2021 Samsung Electronics. All rights reserved. @@ -40,14 +40,15 @@ import yaml import socket import os.path as path import tempfile +import psycopg2 from pathlib import Path import check_health import htbtworker as heartbeat import get_logger import cbs_polling -from mod import trapd_settings as tds -from mod.trapd_get_cbs_config import get_cbs_config +from mod import htbt_settings as tds +from mod.htbt_get_cbs_config import get_cbs_config hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) _logger = logging.getLogger(__name__) @@ -83,14 +84,7 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password def read_hb_common(user_name, password, ip_address, port_num, db_name): - env_pytest = os.getenv("pytest", "") - if env_pytest == "test": - hbc_pid = 10 - hbc_src_name = "srvc_name" - hbc_time = 1584595881 - hbc_state = "RUNNING" - return hbc_pid, hbc_state, hbc_src_name, hbc_time - connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) + connection_db = heartbeat.postgres_db_open() cur = connection_db.cursor() cur.execute("SELECT process_id, source_name, last_accessed_time, current_state FROM hb_common") rows = cur.fetchall() @@ -98,7 +92,6 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name): hbc_src_name = rows[0][1] hbc_time = rows[0][2] hbc_state = rows[0][3] - heartbeat.commit_and_close_db(connection_db) cur.close() return hbc_pid, hbc_state, hbc_src_name, hbc_time @@ -109,9 +102,9 @@ def create_update_hb_common(update_flg, process_id, state, user_name, password, source_name = source_name + "-" + os.getenv("SERVICE_NAME", "") env_pytest = os.getenv("pytest", "") if env_pytest != "test": - connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) + connection_db = heartbeat.postgres_db_open() cur = connection_db.cursor() - if heartbeat.db_table_creation_check(connection_db, "hb_common") is False: + if db_table_creation_check(connection_db, "hb_common") is False: cur.execute( """ CREATE TABLE hb_common ( @@ -133,18 +126,33 @@ def create_update_hb_common(update_flg, process_id, state, user_name, password, heartbeat.commit_and_close_db(connection_db) cur.close() - +def db_table_creation_check(connection_db, table_name): + cur = connection_db.cursor() + try: + cur.execute("SELECT * FROM information_schema.tables WHERE table_name = %s", (table_name,)) + database_names = cur.fetchone() + if database_names is not None: + if table_name in database_names: + return True + else: + return False + except psycopg2.DatabaseError as e: + msg = "COMMON:Error %s" % e + _logger.error(msg) + finally: + cur.close() + def create_update_vnf_table_1(jsfile, update_db, connection_db): with open(jsfile, "r") as outfile: cfg = json.load(outfile) hbcfg = cfg["heartbeat_config"] jhbcfg = json.loads(hbcfg) + cur = connection_db.cursor() env_pytest = os.getenv("pytest", "") if env_pytest == "test": vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] else: - cur = connection_db.cursor() - if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False: + if db_table_creation_check(connection_db, "vnf_table_1") is False: cur.execute( """ CREATE TABLE vnf_table_1 ( @@ -232,14 +240,14 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): def hb_worker_process(config_file_path): subprocess.call([ABSOLUTE_PATH1, config_file_path]) sys.stdout.flush() - _logger.info("MSHBT:Creaated Heartbeat worker process") + _logger.info("MSHBT:Created Heartbeat worker process") return def db_monitoring_process(current_pid, jsfile): subprocess.call([ABSOLUTE_PATH2, str(current_pid), jsfile]) sys.stdout.flush() - _logger.info("MSHBT:Creaated DB Monitoring process") + _logger.info("MSHBT:Created DB Monitoring process") return @@ -335,10 +343,10 @@ def create_update_db(update_db, jsfile, ip_address, port_num, user_name, passwor create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name _logger.info(msg) - connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) + connection_db = heartbeat.postgres_db_open() cur = connection_db.cursor() if update_db == 0: - if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False: + if db_table_creation_check(connection_db, "vnf_table_1") is False: create_update_vnf_table_1(jsfile, update_db, connection_db) else: create_update_vnf_table_1(jsfile, update_db, connection_db) @@ -367,6 +375,7 @@ def create_process(job_list, jsfile, pid_current): return job_list + def main(): get_logger.configure_logger("misshtbtd") pid_current = os.getpid() @@ -412,7 +421,7 @@ def main(): _logger.info("MSHBD:Current process id is %d", pid_current) _logger.info("MSHBD:Now be in a continuous loop") i = 0 - while True: + while True: # pragma: no cover hbc_pid, hbc_state, hbc_src_name, hbc_time = read_hb_common( user_name, password, ip_address, port_num, db_name ) @@ -432,16 +441,6 @@ def main(): _logger.info(msg) source_name = socket.gethostname() source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) - env_pytest = os.getenv("pytest", "") - if env_pytest == "test": - if i == 2: - hbc_pid = pid_current - source_name = hbc_src_name - hbc_state = "RECONFIGURATION" - elif i > 3: - hbc_pid = pid_current - source_name = hbc_src_name - hbc_state = "RUNNING" if time_difference < 60: if (int(hbc_pid) == int(pid_current)) and (source_name == hbc_src_name): msg = "MSHBD:config status is", hbc_state @@ -491,7 +490,7 @@ def main(): _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE") else: jsfile = fetch_json_file() - msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current + msg = "MSHBD: Creating HB and DBM threads. The param passed %d and %s", jsfile, pid_current _logger.info(msg) job_list = create_process(job_list, jsfile, pid_current) hbc_pid, hbc_state, hbc_src_name, hbc_time = read_hb_common( @@ -504,24 +503,6 @@ def main(): else: _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") time.sleep(25) - if os.getenv("pytest", "") == "test": - i = i + 1 - if i > 5: - _logger.info("Terminating main process for pytest") - cbs_polling_proc.terminate() - time.sleep(1) - cbs_polling_proc.join() - if len(job_list) > 0: - job_list[0].terminate() - time.sleep(1) - job_list[0].join() - job_list.remove(job_list[0]) - if len(job_list) > 0: - job_list[0].terminate() - time.sleep(1) - job_list[0].join() - job_list.remove(job_list[0]) - break except Exception as e: msg = "MSHBD:Exception as %s" % (str(traceback.format_exc())) diff --git a/miss_htbt_service/mod/trapd_exit.py b/miss_htbt_service/mod/htbt_exit.py index 7791b31..3779e15 100644 --- a/miss_htbt_service/mod/trapd_exit.py +++ b/miss_htbt_service/mod/htbt_exit.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # Copyright (c) 2020 Deutsche Telekom. All rights reserved. # ================================================================================ @@ -17,7 +17,7 @@ # ============LICENSE_END========================================================= """ -trapc_exit_snmptrapd is responsible for removing any existing runtime PID +htbt_exit is responsible for removing any existing runtime PID file, and exiting with the provided (param 1) exit code """ @@ -25,7 +25,7 @@ __docformat__ = "restructuredtext" import sys import os -from mod.trapd_runtime_pid import rm_pid +from mod.htbt_runtime_pid import rm_pid prog_name = os.path.basename(__file__) diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/htbt_get_cbs_config.py index 034b32b..613b46f 100644 --- a/miss_htbt_service/mod/trapd_get_cbs_config.py +++ b/miss_htbt_service/mod/htbt_get_cbs_config.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # Copyright (c) 2020 Deutsche Telekom. All rights reserved. # Copyright (c) 2021 Samsung Electronics. All rights reserved. @@ -28,15 +28,15 @@ __docformat__ = "restructuredtext" import json import os from onap_dcae_cbs_docker_client.client import get_config -from mod import trapd_settings as tds -from mod.trapd_exit import cleanup, cleanup_and_exit -from mod.trapd_io import stdout_logger +from mod import htbt_settings as tds +from mod.htbt_exit import cleanup, cleanup_and_exit +from mod.htbt_io import stdout_logger prog_name = os.path.basename(__file__) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # -# function: trapd_get_config_sim +# function: htbt_get_config_sim # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # diff --git a/miss_htbt_service/mod/trapd_http_session.py b/miss_htbt_service/mod/htbt_http_session.py index 17eb302..f0a409a 100644 --- a/miss_htbt_service/mod/trapd_http_session.py +++ b/miss_htbt_service/mod/htbt_http_session.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ # ============LICENSE_END========================================================= """ -trapd_http_session establishes an http session for future use in publishing +htbt_http_session establishes an http session for future use in publishing messages to the dmaap cluster. """ diff --git a/miss_htbt_service/mod/trapd_io.py b/miss_htbt_service/mod/htbt_io.py index 26445fc..3f16a7b 100644 --- a/miss_htbt_service/mod/trapd_io.py +++ b/miss_htbt_service/mod/htbt_io.py @@ -1,5 +1,5 @@ # ============LICENSE_START=======================================================) -# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # Copyright (c) 2020 Deutsche Telekom. All rights reserved. # ================================================================================ diff --git a/miss_htbt_service/mod/trapd_runtime_pid.py b/miss_htbt_service/mod/htbt_runtime_pid.py index 823d29f..d086ed2 100644 --- a/miss_htbt_service/mod/trapd_runtime_pid.py +++ b/miss_htbt_service/mod/htbt_runtime_pid.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ # ============LICENSE_END========================================================= """ -trapd_runtime_pid maintains a 'PID file' (file that contains the +htbt_runtime_pid maintains a 'PID file' (file that contains the PID of currently running trap receiver) """ diff --git a/miss_htbt_service/mod/trapd_settings.py b/miss_htbt_service/mod/htbt_settings.py index 0f6a9a1..b5b58cc 100644 --- a/miss_htbt_service/mod/trapd_settings.py +++ b/miss_htbt_service/mod/htbt_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START=======================================================) -# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/htbt_vnf_table.py index f9fecbb..3396e80 100644 --- a/miss_htbt_service/mod/trapd_vnf_table.py +++ b/miss_htbt_service/mod/htbt_vnf_table.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # Copyright (c) 2020 Deutsche Telekom. All rights reserved. # Copyright (c) 2021 Samsung Electronics. All rights reserved. @@ -21,7 +21,7 @@ # Author Kiran Mandal (km386e) """ -trapd_vnf_table verifies the successful creation of DB Tables. +htbt_vnf_table verifies the successful creation of DB Tables. """ import logging import os @@ -57,9 +57,9 @@ def hb_properties(): def verify_DB_creation_1(user_name, password, ip_address, port_num, db_name): - connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) + connection_db = pm.postgres_db_open() try: - _db_status = pm.db_table_creation_check(connection_db, "vnf_table_1") + _db_status = db.db_table_creation_check(connection_db, "vnf_table_1") except Exception: return None @@ -68,20 +68,20 @@ def verify_DB_creation_1(user_name, password, ip_address, port_num, db_name): def verify_DB_creation_2(user_name, password, ip_address, port_num, db_name): - connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) + connection_db = pm.postgres_db_open() try: - _db_status = pm.db_table_creation_check(connection_db, "vnf_table_2") + _db_status = db.db_table_creation_check(connection_db, "vnf_table_2") except Exception: return None return _db_status -def verify_DB_creation_hb_common(user_name, password, ip_address, port_num, db_name): +def verify_DB_creation_hb_common(): - connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) + connection_db = pm.postgres_db_open() try: - _db_status = pm.db_table_creation_check(connection_db, "hb_common") + _db_status = db.db_table_creation_check(connection_db, "hb_common") except Exception: return None |