aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2022-11-17 14:51:38 -0500
committerVijay Venkatesh Kumar <vv770d@att.com>2023-01-05 17:10:16 -0500
commit341b5bb2347c30344662675936b90b325efe5520 (patch)
treec5fa256f77cae915bd758a060b69e53c4039e7c1 /miss_htbt_service
parent2e840627a6b01475eb98b52f0a45593b4f2b8641 (diff)
Heartbeat code refactoring
code optimization & test improvement Issue-ID: DCAEGEN2-2953 Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com> Change-Id: I99229d966c13ad666ac994ab5a582aeeaa306639 Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Diffstat (limited to 'miss_htbt_service')
-rw-r--r--miss_htbt_service/config.json1
-rw-r--r--miss_htbt_service/db_monitoring.py43
-rw-r--r--miss_htbt_service/htbtworker.py384
-rw-r--r--miss_htbt_service/misshtbtd.py83
-rw-r--r--miss_htbt_service/mod/htbt_exit.py (renamed from miss_htbt_service/mod/trapd_exit.py)6
-rw-r--r--miss_htbt_service/mod/htbt_get_cbs_config.py (renamed from miss_htbt_service/mod/trapd_get_cbs_config.py)10
-rw-r--r--miss_htbt_service/mod/htbt_http_session.py (renamed from miss_htbt_service/mod/trapd_http_session.py)4
-rw-r--r--miss_htbt_service/mod/htbt_io.py (renamed from miss_htbt_service/mod/trapd_io.py)2
-rw-r--r--miss_htbt_service/mod/htbt_runtime_pid.py (renamed from miss_htbt_service/mod/trapd_runtime_pid.py)4
-rw-r--r--miss_htbt_service/mod/htbt_settings.py (renamed from miss_htbt_service/mod/trapd_settings.py)2
-rw-r--r--miss_htbt_service/mod/htbt_vnf_table.py (renamed from miss_htbt_service/mod/trapd_vnf_table.py)18
11 files changed, 283 insertions, 274 deletions
diff --git a/miss_htbt_service/config.json b/miss_htbt_service/config.json
new file mode 100644
index 0000000..e853f6a
--- /dev/null
+++ b/miss_htbt_service/config.json
@@ -0,0 +1 @@
+{"pg_ipAddress": "10.0.4.1", "pg_userName": "postgres", "pg_dbName": "postgres", "streams_subscribes": {"ves-heartbeat": {"type": "message_router", "dmaap_info": {"topic_url": "http://10.12.5.252:3904/events/unauthenticated.SEC_HEARTBEAT_INPUT"}}}, "consumerID": "1", "CBS_polling_interval": "300", "pg_passwd": "postgres", "streams_publishes": {"dcae_cl_out": {"type": "message_router", "dmaap_info": {"topic_url": "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT"}}}, "pg_portNum": "5432", "CBS_polling_allowed": "True", "groupID": "groupID", "heartbeat_config": "{\"vnfs\": [{\"eventName\": \"Heartbeat_vDNS\",\"heartbeatcountmissed\": 3,\"heartbeatinterval\": 60,\"closedLoopControlName\": \"ControlLoopEvent1\",\t\"policyVersion\": \"1.0.0.5\",\t\"policyName\":\"vFireWall\",\"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\",\"target_type\": \"VNF\",\t\"target\": \"genVnfName\",\t\"version\": \"1.0\"}, {\"eventName\": \"Heartbeat_vFW\",\"heartbeatcountmissed\": 3,\t\"heartbeatinterval\": 60,\"closedLoopControlName\": \"ControlLoopEvent1\",\"policyVersion\": \"1.0.0.5\",\"policyName\": \"vFireWall\",\"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\",\t\"target_type\":\"VNF\",\t\"target\": \"genVnfName\",\t\"version\": \"1.0\"}, {\"eventName\": \"Heartbeat_xx\",\"heartbeatcountmissed\": 3,\t\"heartbeatinterval\": 60,\"closedLoopControlName\": \"ControlLoopEvent1\",\"policyVersion\": \"1.0.0.5\",\"policyName\": \"vFireWall\",\t\"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\",\"target_type\": \"VNF\",\"target\": \"genVnfName\",\"version\": \"1.0\"}]}"}
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
index eac8a91..cbd8f24 100644
--- a/miss_htbt_service/db_monitoring.py
+++ b/miss_htbt_service/db_monitoring.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# Copyright (c) 2021 Fujitsu Ltd.
@@ -30,6 +30,7 @@ import os
import socket
import time
import requests
+import psycopg2
import htbtworker as pm
import misshtbtd as db
import get_logger
@@ -159,10 +160,6 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
while True:
time.sleep(20)
- env_pytest = os.getenv("pytest", "")
- if env_pytest == "test":
- break
-
try:
with open(json_file, "r") as outfile:
cfg = json.load(outfile)
@@ -177,9 +174,9 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
)
source_name = socket.gethostname()
source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
- connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ connection_db = pm.postgres_db_open()
cur = connection_db.cursor()
- if int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING":
+ if int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING": # pragma: no cover
_logger.info("DBM: Active DB Monitoring Instance")
cur.execute("SELECT event_name FROM vnf_table_1")
vnf_list = [item[0] for item in cur.fetchall()]
@@ -224,7 +221,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
epoc_time_sec = row[0][0]
src_name = row[0][1]
cl_flag = row[0][2]
- if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0:
+ if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover
sendControlLoopEvent(
"ONSET",
pol_url,
@@ -244,7 +241,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
(cl_flag, event_name, (source_name_key + 1)),
)
connection_db.commit()
- elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1:
+ elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover
sendControlLoopEvent(
"ABATED",
pol_url,
@@ -277,17 +274,21 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
"""
else: # pragma: no cover
msg = "DBM:Inactive instance or hb_common state is not RUNNING"
- _logger.info(msg)
- pm.commit_and_close_db(connection_db)
+ _logger.info(msg)
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ connection_db.close()
+ return True
+ except psycopg2.DatabaseError as e:
+ msg = "COMMON:Error %s" % e
+ _logger.error(msg)
+ return False
cur.close()
break
-
-if __name__ == "__main__":
+def db_monitoring_wrapper (current_pid, jsfile, number_of_iterations=-1):
get_logger.configure_logger("db_monitoring")
_logger.info("DBM: DBM Process started")
- current_pid = sys.argv[1]
- jsfile = sys.argv[2]
(
ip_address,
port_num,
@@ -299,8 +300,12 @@ if __name__ == "__main__":
) = db.read_hb_properties(jsfile)
msg = "DBM:Parent process ID and json file name", current_pid, jsfile
_logger.info(msg)
- while True:
+ while number_of_iterations != 0:
+ number_of_iterations -= 1
db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name)
- env_pytest = os.getenv("pytest", "")
- if env_pytest == "test":
- break
+
+
+if __name__ == "__main__":
+ current_pid = sys.argv[1]
+ jsfile = sys.argv[2]
+ db_monitoring_wrapper (current_pid,jsfile)
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index 6765266..3328973 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# Copyright (c) 2021 Fujitsu Ltd.
@@ -17,11 +17,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# Author Prakash Hosangady(ph553f@att.com)
-# Simple Microservice
-# Tracks Heartbeat messages on input topic in DMaaP
-# and poppulate the information in postgres DB
+
import logging
import psycopg2
@@ -31,55 +27,45 @@ import os.path as path
import json
import sys
import time
+
import misshtbtd as db
import get_logger
_logger = logging.getLogger(__name__)
+configjsonfile = "../etc/config.json"
-def read_json_file(i, prefix="../../tests"):
- if i == 0:
- with open(path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile:
- cfg = json.load(outfile)
- elif i == 1:
- with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile:
- cfg = json.load(outfile)
- elif i == 2:
- with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile:
- cfg = json.load(outfile)
- return cfg
-
-
-def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
+def process_msg(cfgjsonfile, number_of_iterations=-1):
+ """
+ Function to poll events from MR continuously
+ and determine if matching configuration to be
+ tracked for missed HB function
+ """
+
global mr_url
- i = 0
+ global configjsonfile
+ configjsonfile = cfgjsonfile
sleep_duration = 20
- while True:
+ reconfig_Flag = False
+ while number_of_iterations != 0:
+ number_of_iterations -= 1
time.sleep(sleep_duration)
- with open(jsfile, "r") as outfile:
+ print ("*** CFG json file info " + configjsonfile)
+ with open(configjsonfile, "r") as outfile:
cfg = json.load(outfile)
+
+ print ("*** CFG info " + str(cfg))
mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"])
+
+ username = str(cfg["pg_userName"])
+ password = str(cfg["pg_passwd"])
+ db_host = str(cfg["pg_ipAddress"])
+ db_port = cfg["pg_portNum"]
+ database_name = str(cfg["pg_dbName"])
+
+ reconfig_Flag = check_process_reconfiguration (username, password, db_host, db_port, database_name)
+ eventname_list = get_eventnamelist ()
- while True:
- hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(
- user_name, password, ip_address, port_num, db_name
- )
- if hbc_state == "RECONFIGURATION":
- _logger.info("HBT:Waiting for hb_common state to become RUNNING")
- time.sleep(10)
- else:
- break
-
- if os.getenv("pytest", "") == "test":
- eventname_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
- connection_db = 0
- else:
- connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
- cur = connection_db.cursor()
- cur.execute("SELECT event_name FROM vnf_table_1")
- eventname_list = [item[0] for item in cur.fetchall()]
- msg = "\n\nHBT:eventnameList values ", eventname_list
- _logger.info(msg)
if "groupID" not in os.environ or "consumerID" not in os.environ:
get_url = mr_url + "/DefaultGroup/1?timeout=15000"
else:
@@ -87,105 +73,60 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
msg = "HBT:Getting :" + get_url
_logger.info(msg)
- if os.getenv("pytest", "") == "test":
- jsonobj = read_json_file(i)
- jobj = []
- jobj.append(jsonobj)
- i = i + 1
- msg = "HBT:newly received test message", jobj
- _logger.info(msg)
- if i >= 3:
- i = 0
- break
- else:
+ try:
+ res = requests.get(get_url)
+ except Exception as e:
+ # message-router may be down temporarily. continue polling loop to try again
+ _logger.error("HBT: Failed to fetch messages from DMaaP. get_url=%s", get_url, exc_info=e)
+ time.sleep(1)
+ continue
+ _logger.info("HBT: %s", res.text)
+ input_string = res.text
+ # If mrstatus in message body indicates some information, not json msg.
+ if "mrstatus" in input_string:
+ continue
+ jlist = input_string.split("\n")
+ print (jlist)
+ # Process the DMaaP input message retrieved
+ error = False
+ jobj = []
+ for line in jlist:
try:
- res = requests.get(get_url)
- except Exception as e:
- # message-router may be down temporarily. continue polling loop to try again
- _logger.error("HBT: Failed to fetch messages from DMaaP. get_url=%s", get_url, exc_info=e)
- time.sleep(1)
- continue
- _logger.info("HBT: %s", res.text)
- input_string = res.text
- # If mrstatus in message body indicates some information, not json msg.
- if "mrstatus" in input_string:
- continue
- jlist = input_string.split("\n")
- # Process the DMaaP input message retreived
- error = False
- for line in jlist:
- try:
- jobj = json.loads(line)
- except ValueError:
- msg = "HBT:Decoding JSON has failed"
- _logger.error(msg)
- error = True
- break
- if error:
- continue
- if len(jobj) == 0:
- continue
- for item in jobj:
- try:
- if os.getenv("pytest", "") == "test":
- jitem = jsonobj
- else:
- jitem = json.loads(item)
- srcname = jitem["event"]["commonEventHeader"]["sourceName"]
- lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
- # if lastEpochMicrosec looks like microsec, align it with millisec
- if lastepo > 1000000000000000:
- lastepo = int(lastepo / 1000)
- seqnum = jitem["event"]["commonEventHeader"]["sequence"]
- event_name = jitem["event"]["commonEventHeader"]["eventName"]
- except Exception as err:
- msg = "HBT message process error - ", err
+ jobj = json.loads(line)
+ except ValueError:
+ msg = "HBT:Decoding JSON has failed"
_logger.error(msg)
- continue
+ error = True
+ break
+ if error:
+ continue
+ if len(jobj) == 0:
+ continue
+
+ _logger.info("HBT jobj Array : %s", jobj)
+ for item in jobj:
+ srcname, lastepo, seqnum, event_name = parse_event(item)
msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname
_logger.info(msg)
- if db_table_creation_check(connection_db, "vnf_table_2") is False:
- msg = "HBT:Creating vnf_table_2"
- _logger.info(msg)
- cur.execute(
- """
- CREATE TABLE vnf_table_2 (
- EVENT_NAME varchar,
- SOURCE_NAME_KEY integer,
- PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY),
- LAST_EPO_TIME BIGINT,
- SOURCE_NAME varchar,
- CL_FLAG integer
- )"""
- )
- else:
- msg = "HBT:vnf_table_2 is already there"
- _logger.info(msg)
+
+ check_and_create_vnf2_table ()
+
if event_name in eventname_list: # pragma: no cover
- if os.getenv("pytest", "") == "test":
- break
- cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (event_name,))
+ cur = sql_executor("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", event_name)
row = cur.fetchone()
source_name_count = row[0]
source_name_key = source_name_count + 1
cl_flag = 0
if source_name_count == 0: # pragma: no cover
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute(
- "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (event_name, source_name_key, lastepo, srcname, cl_flag),
- )
- cur.execute(
- "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s",
- (source_name_key, event_name),
- )
- else: # pragma: no cover
+ new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag)
+ else: # pragma: no cover
msg = "HBT:event name, source_name & source_name_count are", event_name, srcname, source_name_count
_logger.info(msg)
for source_name_key in range(source_name_count):
- cur.execute(
+ cur = sql_executor(
"SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s",
- (event_name, (source_name_key + 1)),
+ event_name, (source_name_key + 1)
)
row = cur.fetchall()
if len(row) == 0:
@@ -194,10 +135,10 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if db_srcname == srcname:
msg = "HBT: Update vnf_table_2 : ", source_name_key, row
_logger.info(msg)
- cur.execute(
+ sql_executor(
"UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
"WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
- (lastepo, srcname, event_name, (source_name_key + 1)),
+ lastepo, srcname, event_name, (source_name_key + 1)
)
source_name_key = source_name_count
break
@@ -208,63 +149,152 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if source_name_count == (source_name_key + 1):
source_name_key = source_name_count + 1
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute(
- "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (event_name, source_name_key, lastepo, srcname, cl_flag),
- )
- cur.execute(
- "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
- (source_name_key, event_name),
- )
+ new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag)
else:
- _logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
- commit_db(connection_db)
- commit_and_close_db(connection_db)
- if os.getenv("pytest", "") != "test":
- cur.close()
+ _logger.info("HBT:eventName is not being monitored, Ignoring JSON message")
+ msg = "HBT: Looping to check for new events from DMAAP"
+ print ("HBT: Looping to check for new events from DMAAP")
+ _logger.info(msg)
+def new_vnf_entry (event_name, source_name_key, lastepo, srcname, cl_flag):
+ """
+ Wrapper function to update event to tracking tables
+ """
+
+ sql_executor(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
+ event_name, source_name_key, lastepo, srcname, cl_flag
+ )
+ sql_executor(
+ "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
+ source_name_key, event_name
+ )
-def postgres_db_open(username, password, host, port, database_name):
- if os.getenv("pytest", "") == "test":
- return True
- connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port)
- return connection
+def parse_event (jsonstring):
+ """
+ Function to parse incoming event as json object
+ and parse out required attributes
+ """
+ _logger.info("HBT jsonstring: %s", jsonstring)
+ #convert string to object
+ jitem = json.loads(jsonstring)
+
+ try:
+ srcname = jitem["event"]["commonEventHeader"]["sourceName"]
+ lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
+ # if lastEpochMicrosec looks like microsec, align it with millisec
+ if lastepo > 1000000000000000:
+ lastepo = int(lastepo / 1000)
+ seqnum = jitem["event"]["commonEventHeader"]["sequence"]
+ event_name = jitem["event"]["commonEventHeader"]["eventName"]
+ msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname
+ _logger.info(msg)
+ return (srcname,lastepo,seqnum,event_name)
+ except Exception as err:
+ msg = "HBT message process error - ", err
+ _logger.error(msg)
+def get_eventnamelist ():
+ """
+ Function to fetch active monitored eventname list
+ """
+ cur = sql_executor("SELECT event_name FROM vnf_table_1",)
+ eventname_list = [item[0] for item in cur.fetchall()]
+ msg = "\n\nHBT:eventnameList values ", eventname_list
+ _logger.info(msg)
+ return eventname_list
-def db_table_creation_check(connection_db, table_name):
- if os.getenv("pytest", "") == "test":
- return True
+def check_and_create_vnf2_table ():
+ """
+ Check and create vnf_table_2 used for tracking HB entries
+ """
try:
- cur = connection_db.cursor()
- cur.execute("SELECT * FROM information_schema.tables WHERE table_name = %s", (table_name,))
+ table_exist = False
+ cur = sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2")
database_names = cur.fetchone()
if database_names is not None:
- if table_name in database_names:
- return True
+ if "vnf_table_2" in database_names:
+ table_exist = True
+
+ if table_exist == False:
+ msg = "HBT:Creating vnf_table_2"
+ _logger.info(msg)
+ sql_executor(
+ """
+ CREATE TABLE vnf_table_2 (
+ EVENT_NAME varchar,
+ SOURCE_NAME_KEY integer,
+ PRIMARY KEY(EVENT_NAME, SOURCE_NAME_KEY),
+ LAST_EPO_TIME BIGINT,
+ SOURCE_NAME varchar,
+ CL_FLAG integer
+ )"""
+ ,)
else:
- return False
- except psycopg2.DatabaseError as e:
- msg = "COMMON:Error %s" % e
- _logger.error(msg)
- finally:
- cur.close()
-
-
-def commit_db(connection_db):
- if os.getenv("pytest", "") == "test":
- return True
- try:
- connection_db.commit() # <--- makes sure the change is shown in the database
+ msg = "HBT:vnf_table_2 is already there"
+ _logger.info(msg)
return True
except psycopg2.DatabaseError as e:
msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
+
+def check_process_reconfiguration(username, password, host, port, database_name):
+ """
+ Verify if DB configuration in progress
+ """
+ while True:
+ hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(
+ username, password, host, port, database_name
+ )
+ if hbc_state == "RECONFIGURATION":
+ _logger.info("HBT: RECONFIGURATION in-progress. Waiting for hb_common state to become RUNNING")
+ time.sleep(10)
+ else:
+ _logger.info("HBT: hb_common state is RUNNING")
+ break
+ return False
+
+def postgres_db_open():
+ """
+ Wrapper function for returning DB connection object
+ """
+
+ global configjsonfile
+
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(configjsonfile)
+ connection = psycopg2.connect(database=db_name, user=user_name, password=password, host=ip_address, port=port_num)
+ return connection
+
+
+def sql_executor (query, *values):
+ """
+ wrapper method for DB operation
+ """
+ _logger.info("HBT query: %s values: %s", query, values)
+
+ connection_db = postgres_db_open()
+ cur = connection_db.cursor()
+ cur.execute(query, values)
+ update_commands = ["CREATE", "INSERT", "UPDATE"]
+ if any (x in query for x in update_commands):
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ except psycopg2.DatabaseError as e:
+ msg = "COMMON:Error %s" % e
+ _logger.error(msg)
+ return cur
def commit_and_close_db(connection_db):
- if os.getenv("pytest", "") == "test":
- return True
try:
connection_db.commit() # <--- makes sure the change is shown in the database
connection_db.close()
@@ -277,18 +307,10 @@ def commit_and_close_db(connection_db):
if __name__ == "__main__":
get_logger.configure_logger("htbtworker")
- jsfile = sys.argv[1]
+ configjsonfile = sys.argv[1]
msg = "HBT:HeartBeat thread Created"
_logger.info("HBT:HeartBeat thread Created")
- msg = "HBT:The config file name passed is -%s", jsfile
+ msg = "HBT:The config file name passed is -%s", configjsonfile
_logger.info(msg)
- (
- ip_address,
- port_num,
- user_name,
- password,
- db_name,
- cbs_polling_required,
- cbs_polling_interval,
- ) = db.read_hb_properties(jsfile)
- process_msg(jsfile, user_name, password, ip_address, port_num, db_name)
+ process_msg(configjsonfile)
+
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py
index fd1a09c..23d4835 100644
--- a/miss_htbt_service/misshtbtd.py
+++ b/miss_htbt_service/misshtbtd.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# Copyright (c) 2021 Samsung Electronics. All rights reserved.
@@ -40,14 +40,15 @@ import yaml
import socket
import os.path as path
import tempfile
+import psycopg2
from pathlib import Path
import check_health
import htbtworker as heartbeat
import get_logger
import cbs_polling
-from mod import trapd_settings as tds
-from mod.trapd_get_cbs_config import get_cbs_config
+from mod import htbt_settings as tds
+from mod.htbt_get_cbs_config import get_cbs_config
hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml"))
_logger = logging.getLogger(__name__)
@@ -83,14 +84,7 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password
def read_hb_common(user_name, password, ip_address, port_num, db_name):
- env_pytest = os.getenv("pytest", "")
- if env_pytest == "test":
- hbc_pid = 10
- hbc_src_name = "srvc_name"
- hbc_time = 1584595881
- hbc_state = "RUNNING"
- return hbc_pid, hbc_state, hbc_src_name, hbc_time
- connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ connection_db = heartbeat.postgres_db_open()
cur = connection_db.cursor()
cur.execute("SELECT process_id, source_name, last_accessed_time, current_state FROM hb_common")
rows = cur.fetchall()
@@ -98,7 +92,6 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name):
hbc_src_name = rows[0][1]
hbc_time = rows[0][2]
hbc_state = rows[0][3]
- heartbeat.commit_and_close_db(connection_db)
cur.close()
return hbc_pid, hbc_state, hbc_src_name, hbc_time
@@ -109,9 +102,9 @@ def create_update_hb_common(update_flg, process_id, state, user_name, password,
source_name = source_name + "-" + os.getenv("SERVICE_NAME", "")
env_pytest = os.getenv("pytest", "")
if env_pytest != "test":
- connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ connection_db = heartbeat.postgres_db_open()
cur = connection_db.cursor()
- if heartbeat.db_table_creation_check(connection_db, "hb_common") is False:
+ if db_table_creation_check(connection_db, "hb_common") is False:
cur.execute(
"""
CREATE TABLE hb_common (
@@ -133,18 +126,33 @@ def create_update_hb_common(update_flg, process_id, state, user_name, password,
heartbeat.commit_and_close_db(connection_db)
cur.close()
-
+def db_table_creation_check(connection_db, table_name):
+ cur = connection_db.cursor()
+ try:
+ cur.execute("SELECT * FROM information_schema.tables WHERE table_name = %s", (table_name,))
+ database_names = cur.fetchone()
+ if database_names is not None:
+ if table_name in database_names:
+ return True
+ else:
+ return False
+ except psycopg2.DatabaseError as e:
+ msg = "COMMON:Error %s" % e
+ _logger.error(msg)
+ finally:
+ cur.close()
+
def create_update_vnf_table_1(jsfile, update_db, connection_db):
with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
hbcfg = cfg["heartbeat_config"]
jhbcfg = json.loads(hbcfg)
+ cur = connection_db.cursor()
env_pytest = os.getenv("pytest", "")
if env_pytest == "test":
vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
else:
- cur = connection_db.cursor()
- if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False:
+ if db_table_creation_check(connection_db, "vnf_table_1") is False:
cur.execute(
"""
CREATE TABLE vnf_table_1 (
@@ -232,14 +240,14 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db):
def hb_worker_process(config_file_path):
subprocess.call([ABSOLUTE_PATH1, config_file_path])
sys.stdout.flush()
- _logger.info("MSHBT:Creaated Heartbeat worker process")
+ _logger.info("MSHBT:Created Heartbeat worker process")
return
def db_monitoring_process(current_pid, jsfile):
subprocess.call([ABSOLUTE_PATH2, str(current_pid), jsfile])
sys.stdout.flush()
- _logger.info("MSHBT:Creaated DB Monitoring process")
+ _logger.info("MSHBT:Created DB Monitoring process")
return
@@ -335,10 +343,10 @@ def create_update_db(update_db, jsfile, ip_address, port_num, user_name, passwor
create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name
_logger.info(msg)
- connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ connection_db = heartbeat.postgres_db_open()
cur = connection_db.cursor()
if update_db == 0:
- if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False:
+ if db_table_creation_check(connection_db, "vnf_table_1") is False:
create_update_vnf_table_1(jsfile, update_db, connection_db)
else:
create_update_vnf_table_1(jsfile, update_db, connection_db)
@@ -367,6 +375,7 @@ def create_process(job_list, jsfile, pid_current):
return job_list
+
def main():
get_logger.configure_logger("misshtbtd")
pid_current = os.getpid()
@@ -412,7 +421,7 @@ def main():
_logger.info("MSHBD:Current process id is %d", pid_current)
_logger.info("MSHBD:Now be in a continuous loop")
i = 0
- while True:
+ while True: # pragma: no cover
hbc_pid, hbc_state, hbc_src_name, hbc_time = read_hb_common(
user_name, password, ip_address, port_num, db_name
)
@@ -432,16 +441,6 @@ def main():
_logger.info(msg)
source_name = socket.gethostname()
source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
- env_pytest = os.getenv("pytest", "")
- if env_pytest == "test":
- if i == 2:
- hbc_pid = pid_current
- source_name = hbc_src_name
- hbc_state = "RECONFIGURATION"
- elif i > 3:
- hbc_pid = pid_current
- source_name = hbc_src_name
- hbc_state = "RUNNING"
if time_difference < 60:
if (int(hbc_pid) == int(pid_current)) and (source_name == hbc_src_name):
msg = "MSHBD:config status is", hbc_state
@@ -491,7 +490,7 @@ def main():
_logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE")
else:
jsfile = fetch_json_file()
- msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current
+ msg = "MSHBD: Creating HB and DBM threads. The param passed %d and %s", jsfile, pid_current
_logger.info(msg)
job_list = create_process(job_list, jsfile, pid_current)
hbc_pid, hbc_state, hbc_src_name, hbc_time = read_hb_common(
@@ -504,24 +503,6 @@ def main():
else:
_logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR")
time.sleep(25)
- if os.getenv("pytest", "") == "test":
- i = i + 1
- if i > 5:
- _logger.info("Terminating main process for pytest")
- cbs_polling_proc.terminate()
- time.sleep(1)
- cbs_polling_proc.join()
- if len(job_list) > 0:
- job_list[0].terminate()
- time.sleep(1)
- job_list[0].join()
- job_list.remove(job_list[0])
- if len(job_list) > 0:
- job_list[0].terminate()
- time.sleep(1)
- job_list[0].join()
- job_list.remove(job_list[0])
- break
except Exception as e:
msg = "MSHBD:Exception as %s" % (str(traceback.format_exc()))
diff --git a/miss_htbt_service/mod/trapd_exit.py b/miss_htbt_service/mod/htbt_exit.py
index 7791b31..3779e15 100644
--- a/miss_htbt_service/mod/trapd_exit.py
+++ b/miss_htbt_service/mod/htbt_exit.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# ================================================================================
@@ -17,7 +17,7 @@
# ============LICENSE_END=========================================================
"""
-trapc_exit_snmptrapd is responsible for removing any existing runtime PID
+htbt_exit is responsible for removing any existing runtime PID
file, and exiting with the provided (param 1) exit code
"""
@@ -25,7 +25,7 @@ __docformat__ = "restructuredtext"
import sys
import os
-from mod.trapd_runtime_pid import rm_pid
+from mod.htbt_runtime_pid import rm_pid
prog_name = os.path.basename(__file__)
diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/htbt_get_cbs_config.py
index 034b32b..613b46f 100644
--- a/miss_htbt_service/mod/trapd_get_cbs_config.py
+++ b/miss_htbt_service/mod/htbt_get_cbs_config.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# Copyright (c) 2021 Samsung Electronics. All rights reserved.
@@ -28,15 +28,15 @@ __docformat__ = "restructuredtext"
import json
import os
from onap_dcae_cbs_docker_client.client import get_config
-from mod import trapd_settings as tds
-from mod.trapd_exit import cleanup, cleanup_and_exit
-from mod.trapd_io import stdout_logger
+from mod import htbt_settings as tds
+from mod.htbt_exit import cleanup, cleanup_and_exit
+from mod.htbt_io import stdout_logger
prog_name = os.path.basename(__file__)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
-# function: trapd_get_config_sim
+# function: htbt_get_config_sim
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
diff --git a/miss_htbt_service/mod/trapd_http_session.py b/miss_htbt_service/mod/htbt_http_session.py
index 17eb302..f0a409a 100644
--- a/miss_htbt_service/mod/trapd_http_session.py
+++ b/miss_htbt_service/mod/htbt_http_session.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
# ============LICENSE_END=========================================================
"""
-trapd_http_session establishes an http session for future use in publishing
+htbt_http_session establishes an http session for future use in publishing
messages to the dmaap cluster.
"""
diff --git a/miss_htbt_service/mod/trapd_io.py b/miss_htbt_service/mod/htbt_io.py
index 26445fc..3f16a7b 100644
--- a/miss_htbt_service/mod/trapd_io.py
+++ b/miss_htbt_service/mod/htbt_io.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================)
-# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# ================================================================================
diff --git a/miss_htbt_service/mod/trapd_runtime_pid.py b/miss_htbt_service/mod/htbt_runtime_pid.py
index 823d29f..d086ed2 100644
--- a/miss_htbt_service/mod/trapd_runtime_pid.py
+++ b/miss_htbt_service/mod/htbt_runtime_pid.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
# ============LICENSE_END=========================================================
"""
-trapd_runtime_pid maintains a 'PID file' (file that contains the
+htbt_runtime_pid maintains a 'PID file' (file that contains the
PID of currently running trap receiver)
"""
diff --git a/miss_htbt_service/mod/trapd_settings.py b/miss_htbt_service/mod/htbt_settings.py
index 0f6a9a1..b5b58cc 100644
--- a/miss_htbt_service/mod/trapd_settings.py
+++ b/miss_htbt_service/mod/htbt_settings.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================)
-# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2023 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/htbt_vnf_table.py
index f9fecbb..3396e80 100644
--- a/miss_htbt_service/mod/trapd_vnf_table.py
+++ b/miss_htbt_service/mod/htbt_vnf_table.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2023 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# Copyright (c) 2021 Samsung Electronics. All rights reserved.
@@ -21,7 +21,7 @@
# Author Kiran Mandal (km386e)
"""
-trapd_vnf_table verifies the successful creation of DB Tables.
+htbt_vnf_table verifies the successful creation of DB Tables.
"""
import logging
import os
@@ -57,9 +57,9 @@ def hb_properties():
def verify_DB_creation_1(user_name, password, ip_address, port_num, db_name):
- connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ connection_db = pm.postgres_db_open()
try:
- _db_status = pm.db_table_creation_check(connection_db, "vnf_table_1")
+ _db_status = db.db_table_creation_check(connection_db, "vnf_table_1")
except Exception:
return None
@@ -68,20 +68,20 @@ def verify_DB_creation_1(user_name, password, ip_address, port_num, db_name):
def verify_DB_creation_2(user_name, password, ip_address, port_num, db_name):
- connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ connection_db = pm.postgres_db_open()
try:
- _db_status = pm.db_table_creation_check(connection_db, "vnf_table_2")
+ _db_status = db.db_table_creation_check(connection_db, "vnf_table_2")
except Exception:
return None
return _db_status
-def verify_DB_creation_hb_common(user_name, password, ip_address, port_num, db_name):
+def verify_DB_creation_hb_common():
- connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ connection_db = pm.postgres_db_open()
try:
- _db_status = pm.db_table_creation_check(connection_db, "hb_common")
+ _db_status = db.db_table_creation_check(connection_db, "hb_common")
except Exception:
return None