aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/misshtbtd.py
diff options
context:
space:
mode:
authorHansen, Tony (th1395) <th1395@att.com>2021-12-01 22:01:56 +0000
committerHansen, Tony (th1395) <th1395@att.com>2021-12-02 19:58:31 +0000
commit2108563705a2ec8bb80029d36122c69fa4d06df5 (patch)
tree1453b42bc3535635b136d0963a290243e26ae35f /miss_htbt_service/misshtbtd.py
parent8d7c0201456b7f9af6e91fea90354f4c3de323fe (diff)
run the black formatting tool on python code
also fix up some copyright & license block lines Change-Id: Ifb628e2ef1e5f13fed0a29964eec387d3982d605 Signed-off-by: Hansen, Tony (th1395) <th1395@att.com> Issue-ID: DCAEGEN2-2995 Signed-off-by: Hansen, Tony (th1395) <th1395@att.com>
Diffstat (limited to 'miss_htbt_service/misshtbtd.py')
-rw-r--r--miss_htbt_service/misshtbtd.py257
1 files changed, 172 insertions, 85 deletions
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py
index 6be2260..5ba0860 100644
--- a/miss_htbt_service/misshtbtd.py
+++ b/miss_htbt_service/misshtbtd.py
@@ -2,9 +2,9 @@
# ============LICENSE_START=======================================================
# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Samsung Electronics. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Samsung Electronics. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@ CONFIG_PATH = "../etc/config.json"
def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
from psycopg2 import connect
+
try:
con = connect(user=user_name, host=ip_address, password=password)
database_name = db_name
@@ -82,8 +83,8 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password
def read_hb_common(user_name, password, ip_address, port_num, db_name):
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
hbc_pid = 10
hbc_srcName = "srvc_name"
hbc_time = 1584595881
@@ -105,41 +106,47 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name):
def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name):
current_time = int(round(time.time()))
source_name = socket.gethostname()
- source_name = source_name + "-" + os.getenv('SERVICE_NAME', "")
- envPytest = os.getenv('pytest', "")
- if envPytest != 'test':
+ source_name = source_name + "-" + os.getenv("SERVICE_NAME", "")
+ envPytest = os.getenv("pytest", "")
+ if envPytest != "test":
connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
if heartbeat.db_table_creation_check(connection_db, "hb_common") is False:
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE hb_common (
PROCESS_ID integer primary key,
SOURCE_NAME varchar,
LAST_ACCESSED_TIME integer,
CURRENT_STATE varchar
- )""")
+ )"""
+ )
cur.execute("INSERT INTO hb_common VALUES(%s, %s, %s, %s)", (process_id, source_name, current_time, state))
_logger.info("MSHBT:Created hb_common DB and updated new values")
elif update_flg == 1:
- cur.execute("UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s "
- "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", (current_time, state, process_id, source_name))
+ cur.execute(
+ "UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s "
+ "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s",
+ (current_time, state, process_id, source_name),
+ )
_logger.info("MSHBT:Updated hb_common DB with new values")
heartbeat.commit_and_close_db(connection_db)
cur.close()
def create_update_vnf_table_1(jsfile, update_db, connection_db):
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
- hbcfg = cfg['heartbeat_config']
+ hbcfg = cfg["heartbeat_config"]
jhbcfg = json.loads(hbcfg)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
else:
cur = connection_db.cursor()
if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False:
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE vnf_table_1 (
EVENT_NAME varchar primary key,
HEARTBEAT_MISSED_COUNT integer,
@@ -153,7 +160,8 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db):
VERSION varchar,
SOURCE_NAME_COUNT integer,
VALIDITY_FLAG integer
- )""")
+ )"""
+ )
_logger.info("MSHBT:Created vnf_table_1 table")
if update_db == 1:
cur.execute("UPDATE vnf_table_1 SET VALIDITY_FLAG=0 WHERE VALIDITY_FLAG=1")
@@ -161,35 +169,62 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db):
# Put some initial values into the queue
cur.execute("SELECT event_name FROM vnf_table_1")
vnf_list = [item[0] for item in cur.fetchall()]
- for vnf in (jhbcfg['vnfs']):
- nfc = vnf['eventName']
+ for vnf in jhbcfg["vnfs"]:
+ nfc = vnf["eventName"]
validity_flag = 1
source_name_count = 0
- missed = vnf['heartbeatcountmissed']
- intvl = vnf['heartbeatinterval']
- clloop = vnf['closedLoopControlName']
- policyVersion = vnf['policyVersion']
- policyName = vnf['policyName']
- policyScope = vnf['policyScope']
- target_type = vnf['target_type']
- target = vnf['target']
- version = vnf['version']
-
- if envPytest == 'test':
+ missed = vnf["heartbeatcountmissed"]
+ intvl = vnf["heartbeatinterval"]
+ clloop = vnf["closedLoopControlName"]
+ policyVersion = vnf["policyVersion"]
+ policyName = vnf["policyName"]
+ policyScope = vnf["policyScope"]
+ target_type = vnf["target_type"]
+ target = vnf["target"]
+ version = vnf["version"]
+
+ if envPytest == "test":
# skip executing SQL in test
continue
if nfc not in vnf_list:
- cur.execute("INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
- (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target,
- version, source_name_count, validity_flag))
+ cur.execute(
+ "INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
+ (
+ nfc,
+ missed,
+ intvl,
+ clloop,
+ policyVersion,
+ policyName,
+ policyScope,
+ target_type,
+ target,
+ version,
+ source_name_count,
+ validity_flag,
+ ),
+ )
_logger.debug("Inserted new event_name = %s into vnf_table_1", nfc)
else:
- cur.execute("""UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s,
+ cur.execute(
+ """UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s,
CLOSED_CONTROL_LOOP_NAME = %s, POLICY_VERSION = %s, POLICY_NAME = %s, POLICY_SCOPE = %s,
TARGET_TYPE = %s, TARGET = %s, VERSION = %s, VALIDITY_FLAG = %s where EVENT_NAME = %s""",
- (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version,
- validity_flag, nfc))
- if envPytest != 'test':
+ (
+ missed,
+ intvl,
+ clloop,
+ policyVersion,
+ policyName,
+ policyScope,
+ target_type,
+ target,
+ version,
+ validity_flag,
+ nfc,
+ ),
+ )
+ if envPytest != "test":
cur.close()
_logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file")
@@ -210,31 +245,36 @@ def db_monitoring_process(current_pid, jsfile):
def read_hb_properties_default():
# Read the hbproperties.yaml for postgress and CBS related data
- s = open(hb_properties_file, 'r')
+ s = open(hb_properties_file, "r")
a = yaml.full_load(s)
- if (os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None):
- ip_address = a['pg_ipAddress']
- port_num = a['pg_portNum']
- user_name = a['pg_userName']
- password = a['pg_passwd']
+ if (
+ (os.getenv("pg_ipAddress") is None)
+ or (os.getenv("pg_portNum") is None)
+ or (os.getenv("pg_userName") is None)
+ or (os.getenv("pg_passwd") is None)
+ ):
+ ip_address = a["pg_ipAddress"]
+ port_num = a["pg_portNum"]
+ user_name = a["pg_userName"]
+ password = a["pg_passwd"]
else:
- ip_address = os.getenv('pg_ipAddress')
- port_num = os.getenv('pg_portNum')
- user_name = os.getenv('pg_userName')
- password = os.getenv('pg_passwd')
+ ip_address = os.getenv("pg_ipAddress")
+ port_num = os.getenv("pg_portNum")
+ user_name = os.getenv("pg_userName")
+ password = os.getenv("pg_passwd")
- dbName = a['pg_dbName']
+ dbName = a["pg_dbName"]
db_name = dbName.lower()
- cbs_polling_required = a['CBS_polling_allowed']
- cbs_polling_interval = a['CBS_polling_interval']
+ cbs_polling_required = a["CBS_polling_allowed"]
+ cbs_polling_interval = a["CBS_polling_interval"]
s.close()
return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
def read_hb_properties(jsfile):
try:
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
except Exception as err:
msg = "CBS Json file load error - " + str(err)
@@ -242,20 +282,20 @@ def read_hb_properties(jsfile):
return read_hb_properties_default()
try:
- ip_address = str(cfg['pg_ipAddress'])
- port_num = str(cfg['pg_portNum'])
- user_name = str(cfg['pg_userName'])
- password = str(cfg['pg_passwd'])
- dbName = str(cfg['pg_dbName'])
+ ip_address = str(cfg["pg_ipAddress"])
+ port_num = str(cfg["pg_portNum"])
+ user_name = str(cfg["pg_userName"])
+ password = str(cfg["pg_passwd"])
+ dbName = str(cfg["pg_dbName"])
db_name = dbName.lower()
- cbs_polling_required = str(cfg['CBS_polling_allowed'])
- cbs_polling_interval = str(cfg['CBS_polling_interval'])
- consumer_id = str(cfg['consumerID'])
- group_id = str(cfg['groupID'])
- os.environ['consumerID'] = consumer_id
- os.environ['groupID'] = group_id
+ cbs_polling_required = str(cfg["CBS_polling_allowed"])
+ cbs_polling_interval = str(cfg["CBS_polling_interval"])
+ consumer_id = str(cfg["consumerID"])
+ group_id = str(cfg["groupID"])
+ os.environ["consumerID"] = consumer_id
+ os.environ["groupID"] = group_id
if "SERVICE_NAME" in cfg:
- os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME'])
+ os.environ["SERVICE_NAME"] = str(cfg["SERVICE_NAME"])
except Exception as err:
msg = "CBS Json file read parameter error - " + str(err)
_logger.error(msg)
@@ -273,7 +313,7 @@ def fetch_json_file() -> str:
# Try to get config from CBS. If succeeded, config json is stored to tds.c_config .
if get_cbs_config():
# Save config to temporary file
- with tempfile.NamedTemporaryFile('w', delete=False) as temp:
+ with tempfile.NamedTemporaryFile("w", delete=False) as temp:
_logger.info("MSHBD: New config saved to temp file %s", temp.name)
json.dump(tds.c_config, temp)
# Swap current config with downloaded config
@@ -289,8 +329,8 @@ def fetch_json_file() -> str:
def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
- envPytest = os.getenv('pytest', "")
- if envPytest != 'test': # pragma: no cover
+ envPytest = os.getenv("pytest", "")
+ if envPytest != "test": # pragma: no cover
if update_db == 0:
create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name
@@ -310,7 +350,13 @@ def create_process(job_list, jsfile, pid_current):
if len(job_list) == 0:
p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,))
time.sleep(1)
- p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,))
+ p2 = multiprocessing.Process(
+ target=db_monitoring_process,
+ args=(
+ pid_current,
+ jsfile,
+ ),
+ )
p1.start()
time.sleep(1)
p2.start()
@@ -322,7 +368,7 @@ def create_process(job_list, jsfile, pid_current):
def main():
- get_logger.configure_logger('misshtbtd')
+ get_logger.configure_logger("misshtbtd")
pid_current = os.getpid()
hc_proc = multiprocessing.Process(target=check_health.start_health_check_server)
cbs_polling_proc = multiprocessing.Process(target=cbs_polling.cbs_polling_loop, args=(pid_current,))
@@ -334,15 +380,32 @@ def main():
job_list = []
jsfile = fetch_json_file()
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile)
- msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = read_hb_properties(jsfile)
+ msg = (
+ "MSHBT:HB Properties -",
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ )
_logger.info(msg)
update_db = 0
create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
state = "RECONFIGURATION"
update_flg = 0
create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
- if cbs_polling_required == 'True':
+ if cbs_polling_required == "True":
# note: cbs_polling process must be started after `hb_common` table created
cbs_polling_proc.start()
_logger.info("MSHBD: Started CBS polling process. PID=%d", cbs_polling_proc.pid)
@@ -350,17 +413,27 @@ def main():
_logger.info("MSHBD:Now be in a continuous loop")
i = 0
while True:
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time
_logger.info(msg)
current_time = int(round(time.time()))
time_difference = current_time - hbc_time
- msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference
+ msg = (
+ "MSHBD:pid,srcName,state,time,ctime,timeDiff is",
+ hbc_pid,
+ hbc_srcName,
+ hbc_state,
+ hbc_time,
+ current_time,
+ time_difference,
+ )
_logger.info(msg)
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
if i == 2:
hbc_pid = pid_current
source_name = hbc_srcName
@@ -376,9 +449,13 @@ def main():
if hbc_state == "RUNNING":
state = "RUNNING"
update_flg = 1
- create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name
+ )
elif hbc_state == "RECONFIGURATION":
- _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes")
+ _logger.info(
+ "MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes"
+ )
jsfile = fetch_json_file()
update_db = 1
create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
@@ -387,7 +464,9 @@ def main():
job_list = create_process(job_list, jsfile, pid_current)
state = "RUNNING"
update_flg = 1
- create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name
+ )
else:
_logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping")
@@ -395,7 +474,11 @@ def main():
_logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE")
else:
jsfile = fetch_json_file()
- msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current
+ msg = (
+ "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",
+ jsfile,
+ pid_current,
+ )
_logger.info(msg)
job_list = create_process(job_list, jsfile, pid_current)
else:
@@ -409,13 +492,17 @@ def main():
msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current
_logger.info(msg)
job_list = create_process(job_list, jsfile, pid_current)
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
update_flg = 1
- create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name
+ )
else:
_logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR")
time.sleep(25)
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
i = i + 1
if i > 5:
_logger.info("Terminating main process for pytest")
@@ -451,5 +538,5 @@ def main():
cbs_polling_proc.join()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()