aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSatoshi Fujii <fujii-satoshi@jp.fujitsu.com>2021-06-15 13:19:51 +0000
committerSatoshi Fujii <fujii-satoshi@jp.fujitsu.com>2021-06-18 07:40:41 +0000
commitd39331be3f55876a8a6fc31c5a10f2d68eefe0b0 (patch)
tree53a2009159f82d9e362f6d3204fb01a8c24756ee
parent6c8c23c92cc9e01a652599ca9d97ffdcfd4f4883 (diff)
Reformat code
Use 4 spaces for indentation and put spaces for better readability. Signed-off-by: Satoshi Fujii <fujii-satoshi@jp.fujitsu.com> Issue-ID: DCAEGEN2-2833 Change-Id: I99aa4df83a32b077e2a3f336d17b6b64184c3c12
-rw-r--r--Changelog.md1
-rw-r--r--miss_htbt_service/cbs_polling.py26
-rw-r--r--miss_htbt_service/check_health.py6
-rw-r--r--miss_htbt_service/config_notif.py134
-rw-r--r--miss_htbt_service/db_monitoring.py172
-rw-r--r--miss_htbt_service/get_logger.py9
-rw-r--r--miss_htbt_service/htbtworker.py271
-rw-r--r--miss_htbt_service/misshtbtd.py365
8 files changed, 517 insertions, 467 deletions
diff --git a/Changelog.md b/Changelog.md
index f5fade1..6af1cd9 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Changed
- Cleanup code
- Removed unused imports
+ - Reformatted code whitespace
- Add target/ to .gitignore
diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py
index aa6ac8d..d1253c0 100644
--- a/miss_htbt_service/cbs_polling.py
+++ b/miss_htbt_service/cbs_polling.py
@@ -36,32 +36,34 @@ _logger = get_logger.get_logger(__name__)
def pollCBS(current_pid):
jsfile = db.fetch_json_file()
ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
- msg="CBSP:Main process ID in hb_common is %d",hbc_pid
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ msg = "CBSP:Main process ID in hb_common is %d", hbc_pid
_logger.info(msg)
- msg="CBSP:My parent process ID is %d",current_pid
+ msg = "CBSP:My parent process ID is %d", current_pid
_logger.info(msg)
- msg="CBSP:CBS Polling interval is %d", cbs_polling_interval
+ msg = "CBSP:CBS Polling interval is %d", cbs_polling_interval
_logger.info(msg)
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
- cbs_polling_interval = "30"
+ cbs_polling_interval = "30"
time.sleep(int(cbs_polling_interval))
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
- #connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
- #cur = connection_db.cursor()
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ # connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ # cur = connection_db.cursor()
source_name = socket.gethostname()
source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
- result= True
- if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"):
+ result = True
+ if (int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING"):
_logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION")
state = "RECONFIGURATION"
update_flg = 1
- db.create_update_hb_common(update_flg, hbc_pid, state, user_name,password,ip_address,port_num,db_name)
+ db.create_update_hb_common(update_flg, hbc_pid, state, user_name, password, ip_address, port_num, db_name)
else:
_logger.info("CBSP:Inactive instance or hb_common state is not RUNNING")
return result
+
+
if __name__ == "__main__":
current_pid = sys.argv[1]
- while(True):
+ while (True):
pollCBS(current_pid)
diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py
index 4266273..739cd2b 100644
--- a/miss_htbt_service/check_health.py
+++ b/miss_htbt_service/check_health.py
@@ -39,7 +39,7 @@ class GetHandler(BaseHTTPRequestHandler):
'sys_version=%s' % self.sys_version,
'protocol_version=%s' % self.protocol_version,
'',
- ])
+ ])
self.send_response(200)
self.end_headers()
self.wfile.write(bytes(message, 'utf-8'))
@@ -56,9 +56,11 @@ class GetHandler(BaseHTTPRequestHandler):
self.wfile.write(bytes(data['health'], 'utf-8'))
return
+
if __name__ == '__main__':
from http.server import HTTPServer
- #from BaseHTTPServer import HTTPServer
+
+ # from BaseHTTPServer import HTTPServer
server = HTTPServer(("", 10002), GetHandler)
print('Starting server at http://localhost:10002')
server.serve_forever()
diff --git a/miss_htbt_service/config_notif.py b/miss_htbt_service/config_notif.py
index 913d8a5..087315d 100644
--- a/miss_htbt_service/config_notif.py
+++ b/miss_htbt_service/config_notif.py
@@ -36,27 +36,29 @@ import psycopg2
import mod.trapd_get_cbs_config
import mod.trapd_settings as tds
-hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml"))
+hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml"))
-def postgres_db_open(username,password,host,port,database_name):
+
+def postgres_db_open(username, password, host, port, database_name):
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
return True
try:
- connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port)
+ connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port)
except Exception as e:
print("HB_Notif::postgress connect error: %s" % e)
connection = True
return connection
-def db_table_creation_check(connection_db,table_name):
+
+def db_table_creation_check(connection_db, table_name):
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
return True
cur = None
try:
cur = connection_db.cursor()
- query_db = "select * from information_schema.tables where table_name='%s'" %(table_name)
+ query_db = "select * from information_schema.tables where table_name='%s'" % (table_name)
cur.execute(query_db)
database_names = cur.fetchone()
if (database_names is not None) and (table_name in database_names):
@@ -73,45 +75,49 @@ def db_table_creation_check(connection_db,table_name):
if cur:
cur.close()
+
def commit_and_close_db(connection_db):
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
return True
try:
- connection_db.commit() # <--- makes sure the change is shown in the database
+ connection_db.commit() # <--- makes sure the change is shown in the database
connection_db.close()
return True
except psycopg2.DatabaseError as e:
return False
+
def read_hb_properties_default():
- #Read the hbproperties.yaml for postgress and CBS related data
- s=open(hb_properties_file, 'r')
- a=yaml.full_load(s)
- if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)):
- ip_address = a['pg_ipAddress']
- port_num = a['pg_portNum']
- user_name = a['pg_userName']
- password = a['pg_passwd']
- else:
- ip_address = os.getenv('pg_ipAddress')
- port_num = os.getenv('pg_portNum')
- user_name = os.getenv('pg_userName')
- password = os.getenv('pg_passwd')
+ # Read the hbproperties.yaml for postgress and CBS related data
+ s = open(hb_properties_file, 'r')
+ a = yaml.full_load(s)
+ if ((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (
+ os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)):
+ ip_address = a['pg_ipAddress']
+ port_num = a['pg_portNum']
+ user_name = a['pg_userName']
+ password = a['pg_passwd']
+ else:
+ ip_address = os.getenv('pg_ipAddress')
+ port_num = os.getenv('pg_portNum')
+ user_name = os.getenv('pg_userName')
+ password = os.getenv('pg_passwd')
+
+ dbName = a['pg_dbName']
+ db_name = dbName.lower()
+ cbs_polling_required = a['CBS_polling_allowed']
+ cbs_polling_interval = a['CBS_polling_interval']
+ s.close()
+ # TODO: there is a mismatch here between read_hb_properties_default and read_hb_properties.
+ # read_hb_properties() forces all of the variables returned here to be strings, while the code here does not.
+ return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
- dbName = a['pg_dbName']
- db_name = dbName.lower()
- cbs_polling_required = a['CBS_polling_allowed']
- cbs_polling_interval = a['CBS_polling_interval']
- s.close()
- # TODO: there is a mismatch here between read_hb_properties_default and read_hb_properties.
- # read_hb_properties() forces all of the variables returned here to be strings, while the code here does not.
- return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
def read_hb_properties(jsfile):
try:
with open(jsfile, 'r') as outfile:
- cfg = json.load(outfile)
+ cfg = json.load(outfile)
except(Exception) as err:
print("Json file read error - %s" % err)
return read_hb_properties_default()
@@ -125,13 +131,14 @@ def read_hb_properties(jsfile):
cbs_polling_required = str(cfg['CBS_polling_allowed'])
cbs_polling_interval = str(cfg['CBS_polling_interval'])
if "SERVICE_NAME" in cfg:
- os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME'])
+ os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME'])
except(Exception) as err:
print("Json file read parameter error - %s" % err)
return read_hb_properties_default()
return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
-def read_hb_common(user_name,password,ip_address,port_num,db_name):
+
+def read_hb_common(user_name, password, ip_address, port_num, db_name):
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
hbc_pid = 10
@@ -139,7 +146,7 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name):
hbc_time = 1541234567
hbc_state = "RUNNING"
return hbc_pid, hbc_state, hbc_srcName, hbc_time
- connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;"
cur.execute(query_value)
@@ -154,22 +161,25 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name):
cur.close()
return hbc_pid, hbc_state, hbc_srcName, hbc_time
-def update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name):
+
+def update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name):
current_time = int(round(time.time()))
source_name = socket.gethostname()
source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
return True
- connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
- query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state)
+ query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" % (
+ process_id, source_name, current_time, state)
cur.execute(query_value)
commit_and_close_db(connection_db)
cur.close()
return True
-def fetch_json_file(download_json = "../etc/download1.json", config_json = "../etc/config.json"):
+
+def fetch_json_file(download_json="../etc/download1.json", config_json="../etc/config.json"):
# use the fully qualified name here to let monkeypatching work
# if get_cbs_config():
if mod.trapd_get_cbs_config.get_cbs_config():
@@ -188,31 +198,33 @@ def fetch_json_file(download_json = "../etc/download1.json", config_json = "../e
print("Config_N: The json file is - %s" % jsfile)
return jsfile
-#if __name__ == "__main__":
+
+# if __name__ == "__main__":
def config_notif_run():
- jsfile = fetch_json_file()
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile)
- envPytest = os.getenv('pytest', "")
- if (envPytest == 'test'):
+ jsfile = fetch_json_file()
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(
+ jsfile)
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
return True
- connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name)
- cur = connection_db.cursor()
- if(db_table_creation_check(connection_db,"hb_common") == False):
- print("HB_Notif::ERROR::hb_common table not exists - No config download")
- connection_db.close()
- else:
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name)
- state = "RECONFIGURATION"
- update_flg = 1
- ret = update_hb_common(update_flg, hbc_pid, state, user_name,password,ip_address,port_num,db_name)
- # TODO: There is no way for update_hb_common() to return false
- if (ret == True):
- print("HB_Notif::hb_common table updated with RECONFIGURATION state")
- commit_and_close_db(connection_db)
- return True
- else:
- print("HB_Notif::Failure updating hb_common table")
- commit_and_close_db(connection_db)
- return False
-
- cur.close()
+ connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ cur = connection_db.cursor()
+ if (db_table_creation_check(connection_db, "hb_common") == False):
+ print("HB_Notif::ERROR::hb_common table not exists - No config download")
+ connection_db.close()
+ else:
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ state = "RECONFIGURATION"
+ update_flg = 1
+ ret = update_hb_common(update_flg, hbc_pid, state, user_name, password, ip_address, port_num, db_name)
+ # TODO: There is no way for update_hb_common() to return false
+ if (ret == True):
+ print("HB_Notif::hb_common table updated with RECONFIGURATION state")
+ commit_and_close_db(connection_db)
+ return True
+ else:
+ print("HB_Notif::Failure updating hb_common table")
+ commit_and_close_db(connection_db)
+ return False
+
+ cur.close()
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
index df1bae7..708a2bd 100644
--- a/miss_htbt_service/db_monitoring.py
+++ b/miss_htbt_service/db_monitoring.py
@@ -35,56 +35,58 @@ import get_logger
_logger = get_logger.get_logger(__name__)
-def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target):
- msg="DBM:Time to raise Control Loop Event for Control loop typ /target type - ",CLType, target_type
+
+def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time,
+ closed_control_loop_name, version, target):
+ msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type
_logger.info(msg)
- if(CLType == "ONSET"):
+ if (CLType == "ONSET"):
_logger.info("DBM:Heartbeat not received, raising alarm event")
- if(target_type == "VNF"):
+ if (target_type == "VNF"):
json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": { "generic-vnf.vnf-name": srcName} ,
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- });
- elif(target_type == "VM"):
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"generic-vnf.vnf-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE"
+ });
+ elif (target_type == "VM"):
json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": { "vserver.vserver-name": srcName} ,
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- });
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"vserver.vserver-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE"
+ });
else:
return True
- elif(CLType == "ABATED"):
+ elif (CLType == "ABATED"):
_logger.info("DBM:Heartbeat received, clearing alarm event")
- #last_date_time = datetime.datetime.now()
- if(target_type == "VNF"):
+ # last_date_time = datetime.datetime.now()
+ if (target_type == "VNF"):
json_object = json.dumps({
"closedLoopEventClient": "DCAE_Heartbeat_MS",
"policyVersion": policy_version,
"policyName": policy_name,
"policyScope": policy_scope,
"target_type": target_type,
- "AAI": { "generic-vnf.vnf-name": srcName} ,
+ "AAI": {"generic-vnf.vnf-name": srcName},
"closedLoopAlarmStart": epoc_time,
"closedLoopEventStatus": "ABATED",
"closedLoopControlName": closed_control_loop_name,
@@ -92,15 +94,15 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_s
"target": target,
"requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
"from": "DCAE"
- });
- elif(target_type == "VM"):
+ });
+ elif (target_type == "VM"):
json_object = json.dumps({
"closedLoopEventClient": "DCAE_Heartbeat_MS",
"policyVersion": policy_version,
"policyName": policy_name,
"policyScope": policy_scope,
"target_type": target_type,
- "AAI": { "vserver.vserver-name": srcName} ,
+ "AAI": {"vserver.vserver-name": srcName},
"closedLoopAlarmStart": epoc_time,
"closedLoopEventStatus": "ABATED",
"closedLoopControlName": closed_control_loop_name,
@@ -108,54 +110,55 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_s
"target": target,
"requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
"from": "DCAE"
- });
+ });
else:
return True
else:
return True
payload = json_object
- msg="DBM: CL Json object is", json_object
+ msg = "DBM: CL Json object is", json_object
_logger.info(msg)
- #psend_url = pol_url+'DefaultGroup/1?timeout=15000'
+ # psend_url = pol_url+'DefaultGroup/1?timeout=15000'
psend_url = pol_url
- msg="DBM:",psend_url
+ msg = "DBM:", psend_url
_logger.info(msg)
- #Send response for policy on output topic
+ # Send response for policy on output topic
try:
r = requests.post(psend_url, data=payload)
- msg="DBM:",r.status_code, r.reason
+ msg = "DBM:", r.status_code, r.reason
_logger.info(msg)
ret = r.status_code
- msg="DBM:Status code for sending the control loop event is",ret
+ msg = "DBM:Status code for sending the control loop event is", ret
_logger.info(msg)
except(Exception) as err:
- msg='Message send failure : ', err
+ msg = 'Message send failure : ', err
_logger.error(msg)
return True
-def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,db_name):
- while(True):
+
+def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name):
+ while (True):
time.sleep(20)
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
- break
+ break
try:
- with open(json_file, 'r') as outfile:
- cfg = json.load(outfile)
- pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
+ with open(json_file, 'r') as outfile:
+ cfg = json.load(outfile)
+ pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
except(Exception) as err:
- msg='Json file process error : ', err
- _logger.error(msg)
- continue
+ msg = 'Json file process error : ', err
+ _logger.error(msg)
+ continue
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
source_name = socket.gethostname()
source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
- connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
- if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"):
+ if (int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING"):
_logger.info("DBM: Active DB Monitoring Instance")
db_query = "Select event_name from vnf_table_1"
cur.execute(db_query)
@@ -165,11 +168,11 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d
cur.execute(query_value)
rows = cur.fetchall()
hbc_state = rows[0][0]
- if( hbc_state == "RECONFIGURATION"):
+ if (hbc_state == "RECONFIGURATION"):
_logger.info("DBM:Waiting for hb_common state to become RUNNING")
break
- db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" %(event_name)
+ db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" % (event_name)
cur.execute(db_query)
rows = cur.fetchall()
validity_flag = rows[0][0]
@@ -183,59 +186,64 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d
target_type = rows[0][8]
target = rows[0][9]
version = rows[0][10]
- comparision_time = (heartbeat_interval*heartbeat_missed_count)*1000
- if (validity_flag ==1):
+ comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000
+ if (validity_flag == 1):
for source_name_key in range(source_name_count):
- epoc_time = int(round(time.time()*1000))
- epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1))
+ epoc_time = int(round(time.time() * 1000))
+ epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (event_name, (source_name_key + 1))
cur.execute(epoc_query)
row = cur.fetchall()
- if (len(row)==0):
+ if (len(row) == 0):
continue
epoc_time_sec = row[0][0]
srcName = row[0][1]
cl_flag = row[0][2]
- if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0):
- sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target)
+ if ((epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0):
+ sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope,
+ target_type, srcName, epoc_time, closed_control_loop_name, version,
+ target)
cl_flag = 1
- update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1))
+ update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1))
cur.execute(update_query)
connection_db.commit()
- elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1):
- sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target)
+ elif ((epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1):
+ sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope,
+ target_type, srcName, epoc_time, closed_control_loop_name, version,
+ target)
cl_flag = 0
- update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1))
+ update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1))
cur.execute(update_query)
connection_db.commit()
- else: #pragma: no cover
- msg="DBM:DB Monitoring is ignored for %s since validity flag is 0" %(event_name)
+ else: # pragma: no cover
+ msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % (event_name)
_logger.info(msg)
- delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" %(event_name)
+ delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" % (event_name)
cur.execute(delete_query_table2)
- delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" %(event_name)
+ delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" % (event_name)
cur.execute(delete_query)
connection_db.commit()
"""
Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2
"""
- else: #pragma: no cover
- msg="DBM:Inactive instance or hb_common state is not RUNNING"
+ else: # pragma: no cover
+ msg = "DBM:Inactive instance or hb_common state is not RUNNING"
_logger.info(msg)
pm.commit_and_close_db(connection_db)
cur.close()
break;
+
if __name__ == "__main__":
_logger.info("DBM: DBM Process started")
current_pid = sys.argv[1]
jsfile = sys.argv[2]
ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
- msg="DBM:Parent process ID and json file name",current_pid, jsfile
+ msg = "DBM:Parent process ID and json file name", current_pid, jsfile
_logger.info(msg)
while (True):
- db_monitoring(current_pid,jsfile,user_name,password,ip_address,port_num,db_name)
+ db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name)
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
- break
+ break
diff --git a/miss_htbt_service/get_logger.py b/miss_htbt_service/get_logger.py
index c94e333..bc8fea0 100644
--- a/miss_htbt_service/get_logger.py
+++ b/miss_htbt_service/get_logger.py
@@ -20,23 +20,22 @@ import logging.handlers
'''Configures the module root logger'''
root = logging.getLogger()
if root.handlers:
- #root.handlers.clear()
+ # root.handlers.clear()
del root.handlers[:]
formatter = logging.Formatter('%(asctime)s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(levelname)s | %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root.addHandler(handler)
-fhandler = logging.handlers.RotatingFileHandler('./hb_logs.txt', maxBytes=(1048576*5), backupCount=10)
+fhandler = logging.handlers.RotatingFileHandler('./hb_logs.txt', maxBytes=(1048576 * 5), backupCount=10)
fhandler.setFormatter(formatter)
root.addHandler(fhandler)
root.setLevel("DEBUG")
+
class BadEnviornmentENVNotFound(Exception):
pass
+
def get_logger(module=None):
'''Returns a module-specific logger or global logger if the module is None'''
return root if module is None else root.getChild(module)
-
-
-
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index a2ffeca..5fa4074 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -35,90 +35,92 @@ import get_logger
_logger = get_logger.get_logger(__name__)
+
def read_json_file(i, prefix="../../tests"):
- if (i==0):
- with open (path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile:
- cfg = json.load(outfile)
+ if (i == 0):
+ with open(path.abspath(path.join(__file__, f"{prefix}/test1.json")), "r") as outfile:
+ cfg = json.load(outfile)
elif (i == 1):
- with open (path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile:
- cfg = json.load(outfile)
- elif (i ==2):
- with open( path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile:
- cfg = json.load(outfile)
+ with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile:
+ cfg = json.load(outfile)
+ elif (i == 2):
+ with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile:
+ cfg = json.load(outfile)
return cfg
+
def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
global mr_url
- i=0
+ i = 0
sleep_duration = 20
- while(True):
+ while (True):
time.sleep(sleep_duration)
with open(jsfile, 'r') as outfile:
cfg = json.load(outfile)
mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url'])
- while(True):
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
- if(hbc_state == "RECONFIGURATION"):
+ while (True):
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ if (hbc_state == "RECONFIGURATION"):
_logger.info("HBT:Waiting for hb_common state to become RUNNING")
time.sleep(10)
else:
break
- if(os.getenv('pytest', "") == 'test'):
- eventnameList = ["Heartbeat_vDNS","Heartbeat_vFW","Heartbeat_xx"]
- connection_db = 0
+ if (os.getenv('pytest', "") == 'test'):
+ eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
+ connection_db = 0
else:
- connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
- cur = connection_db.cursor()
- db_query = "Select event_name from vnf_table_1"
- cur.execute(db_query)
- eventnameList = [item[0] for item in cur.fetchall()]
- msg="\n\nHBT:eventnameList values ", eventnameList
+ connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ cur = connection_db.cursor()
+ db_query = "Select event_name from vnf_table_1"
+ cur.execute(db_query)
+ eventnameList = [item[0] for item in cur.fetchall()]
+ msg = "\n\nHBT:eventnameList values ", eventnameList
_logger.info(msg)
if "groupID" not in os.environ or "consumerID" not in os.environ:
- get_url = mr_url + '/DefaultGroup/1?timeout=15000'
+ get_url = mr_url + '/DefaultGroup/1?timeout=15000'
else:
- get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000'
- msg="HBT:Getting :"+get_url
+ get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000'
+ msg = "HBT:Getting :" + get_url
_logger.info(msg)
- if(os.getenv('pytest', "") == 'test'):
- jsonobj = read_json_file(i)
- jobj = []
- jobj.append(jsonobj)
- i=i+1
- msg="HBT:newly received test message", jobj
- _logger.info(msg)
- if (i >= 3):
- i=0
- break
+ if (os.getenv('pytest', "") == 'test'):
+ jsonobj = read_json_file(i)
+ jobj = []
+ jobj.append(jsonobj)
+ i = i + 1
+ msg = "HBT:newly received test message", jobj
+ _logger.info(msg)
+ if (i >= 3):
+ i = 0
+ break
else:
- res = requests.get(get_url)
- msg="HBT:",res.text
- _logger.info(msg)
- inputString = res.text
- #If mrstatus in message body indicates some information, not json msg.
- if ("mrstatus" in inputString):
- continue
- jlist = inputString.split('\n');
- # Process the DMaaP input message retreived
- error = False
- for line in jlist:
- try:
- jobj = json.loads(line)
- except ValueError:
- msg='HBT:Decoding JSON has failed'
- _logger.error(msg)
- error = True
- break
- if (error == True):
- continue
- if len(jobj) == 0:
- continue
+ res = requests.get(get_url)
+ msg = "HBT:", res.text
+ _logger.info(msg)
+ inputString = res.text
+ # If mrstatus in message body indicates some information, not json msg.
+ if ("mrstatus" in inputString):
+ continue
+ jlist = inputString.split('\n');
+ # Process the DMaaP input message retreived
+ error = False
+ for line in jlist:
+ try:
+ jobj = json.loads(line)
+ except ValueError:
+ msg = 'HBT:Decoding JSON has failed'
+ _logger.error(msg)
+ error = True
+ break
+ if (error == True):
+ continue
+ if len(jobj) == 0:
+ continue
for item in jobj:
try:
- if(os.getenv('pytest', "") == 'test'):
+ if (os.getenv('pytest', "") == 'test'):
jitem = jsonobj
else:
jitem = json.loads(item)
@@ -127,91 +129,95 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
seqnum = (jitem['event']['commonEventHeader']['sequence'])
eventName = (jitem['event']['commonEventHeader']['eventName'])
except(Exception) as err:
- msg = "HBT message process error - ",err
+ msg = "HBT message process error - ", err
_logger.error(msg)
continue
- msg="HBT:Newly received HB event values ::", eventName,lastepo,srcname
+ msg = "HBT:Newly received HB event values ::", eventName, lastepo, srcname
_logger.info(msg)
- if(db_table_creation_check(connection_db,"vnf_table_2") ==False):
- msg="HBT:Creating vnf_table_2"
- _logger.info(msg)
- cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);")
+ if (db_table_creation_check(connection_db, "vnf_table_2") == False):
+ msg = "HBT:Creating vnf_table_2"
+ _logger.info(msg)
+ cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);")
else:
- msg="HBT:vnf_table_2 is already there"
- _logger.info(msg)
- if(eventName in eventnameList): #pragma: no cover
- db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" %(eventName)
- msg="HBT:",db_query
+ msg = "HBT:vnf_table_2 is already there"
+ _logger.info(msg)
+ if (eventName in eventnameList): # pragma: no cover
+ db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" % (eventName)
+ msg = "HBT:", db_query
+ _logger.info(msg)
+ if (os.getenv('pytest', "") == 'test'):
+ break
+ cur.execute(db_query)
+ row = cur.fetchone()
+ source_name_count = row[0]
+ source_name_key = source_name_count + 1
+ cl_flag = 0
+ if (source_name_count == 0): # pragma: no cover
+ msg = "HBT: Insert entry in table_2,source_name_count=0 : ", row
_logger.info(msg)
- if(os.getenv('pytest', "") == 'test'):
- break
- cur.execute(db_query)
- row = cur.fetchone()
- source_name_count = row[0]
- source_name_key = source_name_count+1
- cl_flag = 0
- if(source_name_count==0): #pragma: no cover
- msg="HBT: Insert entry in table_2,source_name_count=0 : ",row
- _logger.info(msg)
- query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag)
- cur.execute(query_value)
- update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName)
- cur.execute(update_query)
- else: #pragma: no cover
- msg="HBT:event name, source_name & source_name_count are",eventName, srcname, source_name_count
- _logger.info(msg)
- for source_name_key in range(source_name_count):
- epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(eventName,(source_name_key+1))
- msg="HBT:eppc query is",epoc_query
- _logger.info(msg)
- cur.execute(epoc_query)
- row = cur.fetchall()
- if (len(row)==0):
- continue
- db_srcname = row[0][0]
- if (db_srcname == srcname):
- msg="HBT: Update vnf_table_2 : ",source_name_key, row
- _logger.info(msg)
- update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" %(lastepo,srcname,eventName,(source_name_key+1))
- cur.execute(update_query)
- source_name_key = source_name_count
- break
- else:
- continue
- msg="HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count
+ query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" % (
+ eventName, source_name_key, lastepo, srcname, cl_flag)
+ cur.execute(query_value)
+ update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" % (
+ source_name_key, eventName)
+ cur.execute(update_query)
+ else: # pragma: no cover
+ msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count
+ _logger.info(msg)
+ for source_name_key in range(source_name_count):
+ epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (eventName, (source_name_key + 1))
+ msg = "HBT:eppc query is", epoc_query
_logger.info(msg)
- if (source_name_count == (source_name_key+1)):
- source_name_key = source_name_count+1
- msg="HBT: Insert entry in table_2 : ",row
+ cur.execute(epoc_query)
+ row = cur.fetchall()
+ if (len(row) == 0):
+ continue
+ db_srcname = row[0][0]
+ if (db_srcname == srcname):
+ msg = "HBT: Update vnf_table_2 : ", source_name_key, row
_logger.info(msg)
- insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag)
- cur.execute(insert_query)
- update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName)
+ update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" % (lastepo, srcname, eventName, (source_name_key + 1))
cur.execute(update_query)
+ source_name_key = source_name_count
+ break
+ else:
+ continue
+ msg = "HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count
+ _logger.info(msg)
+ if (source_name_count == (source_name_key + 1)):
+ source_name_key = source_name_count + 1
+ msg = "HBT: Insert entry in table_2 : ", row
+ _logger.info(msg)
+ insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" % (
+ eventName, source_name_key, lastepo, srcname, cl_flag)
+ cur.execute(insert_query)
+ update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" % (source_name_key, eventName)
+ cur.execute(update_query)
else:
- _logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
+ _logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
commit_db(connection_db)
commit_and_close_db(connection_db)
- if(os.getenv('pytest', "") != 'test'):
- cur.close()
+ if (os.getenv('pytest', "") != 'test'):
+ cur.close()
-def postgres_db_open(username,password,host,port,database_name):
- if(os.getenv('pytest', "") == 'test'):
+def postgres_db_open(username, password, host, port, database_name):
+ if (os.getenv('pytest', "") == 'test'):
return True
- connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port)
+ connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port)
return connection
-def db_table_creation_check(connection_db,table_name):
- if(os.getenv('pytest', "") == 'test'):
+
+def db_table_creation_check(connection_db, table_name):
+ if (os.getenv('pytest', "") == 'test'):
return True
try:
cur = connection_db.cursor()
- query_db = "select * from information_schema.tables where table_name='%s'" %(table_name)
+ query_db = "select * from information_schema.tables where table_name='%s'" % (table_name)
cur.execute(query_db)
database_names = cur.fetchone()
- if(database_names is not None):
- if(table_name in database_names):
+ if (database_names is not None):
+ if (table_name in database_names):
return True
else:
return False
@@ -223,22 +229,24 @@ def db_table_creation_check(connection_db,table_name):
finally:
cur.close()
+
def commit_db(connection_db):
- if(os.getenv('pytest', "") == 'test'):
+ if (os.getenv('pytest', "") == 'test'):
return True
try:
- connection_db.commit() # <--- makes sure the change is shown in the database
+ connection_db.commit() # <--- makes sure the change is shown in the database
return True
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = 'COMMON:Error %s' % e
_logger.error(msg)
return False
+
def commit_and_close_db(connection_db):
- if(os.getenv('pytest', "") == 'test'):
+ if (os.getenv('pytest', "") == 'test'):
return True
try:
- connection_db.commit() # <--- makes sure the change is shown in the database
+ connection_db.commit() # <--- makes sure the change is shown in the database
connection_db.close()
return True
except psycopg2.DatabaseError as e:
@@ -246,11 +254,12 @@ def commit_and_close_db(connection_db):
_logger.error(msg)
return False
+
if __name__ == '__main__':
jsfile = sys.argv[1]
- msg="HBT:HeartBeat thread Created"
+ msg = "HBT:HeartBeat thread Created"
_logger.info("HBT:HeartBeat thread Created")
- msg="HBT:The config file name passed is -%s", jsfile
+ msg = "HBT:The config file name passed is -%s", jsfile
_logger.info(msg)
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties(jsfile)
- process_msg(jsfile,user_name, password, ip_address, port_num, db_name)
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ process_msg(jsfile, user_name, password, ip_address, port_num, db_name)
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py
index 868020c..12d439a 100644
--- a/miss_htbt_service/misshtbtd.py
+++ b/miss_htbt_service/misshtbtd.py
@@ -44,21 +44,22 @@ import get_logger
from mod import trapd_settings as tds
from mod.trapd_get_cbs_config import get_cbs_config
-hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml"))
+hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml"))
ABSOLUTE_PATH1 = path.abspath(path.join(__file__, "../htbtworker.py"))
ABSOLUTE_PATH2 = path.abspath(path.join(__file__, "../db_monitoring.py"))
ABSOLUTE_PATH3 = path.abspath(path.join(__file__, "../check_health.py"))
ABSOLUTE_PATH4 = path.abspath(path.join(__file__, "../cbs_polling.py"))
+
def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
from psycopg2 import connect
try:
- con = connect(user=user_name, host = ip_address, password = password)
+ con = connect(user=user_name, host=ip_address, password=password)
database_name = db_name
con.autocommit = True
cur = con.cursor()
- query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" %(database_name)
+ query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" % (database_name)
cur.execute(query_value)
not_exists_row = cur.fetchone()
msg = "MSHBT:Create_database:DB not exists? ", not_exists_row
@@ -66,7 +67,7 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password
not_exists = not_exists_row[0]
if not_exists is True:
_logger.info("MSHBT:Creating database ...")
- query_value = "CREATE DATABASE %s" %(database_name)
+ query_value = "CREATE DATABASE %s" % (database_name)
cur.execute(query_value)
else:
_logger.info("MSHBD:Database already exists")
@@ -81,16 +82,17 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password
cur.close()
con.close()
except(Exception) as err:
- msg = "MSHBD:DB Creation -",err
+ msg = "MSHBD:DB Creation -", err
_logger.error(msg)
-#def get_pol_and_mr_urls(jsfile, pol_url, mr_url):
+
+# def get_pol_and_mr_urls(jsfile, pol_url, mr_url):
# with open(jsfile, 'r') as outfile:
# cfg = json.load(outfile)
# mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url'])
# pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
-def read_hb_common(user_name,password,ip_address,port_num,db_name):
+def read_hb_common(user_name, password, ip_address, port_num, db_name):
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
hbc_pid = 10
@@ -98,7 +100,7 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name):
hbc_time = 1584595881
hbc_state = "RUNNING"
return hbc_pid, hbc_state, hbc_srcName, hbc_time
- connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;"
cur.execute(query_value)
@@ -111,27 +113,29 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name):
cur.close()
return hbc_pid, hbc_state, hbc_srcName, hbc_time
-def create_update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name):
+
+def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name):
current_time = int(round(time.time()))
source_name = socket.gethostname()
source_name = source_name + "-" + os.getenv('SERVICE_NAME', "")
envPytest = os.getenv('pytest', "")
if (envPytest != 'test'):
- connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name)
- cur = connection_db.cursor()
- if(heartbeat.db_table_creation_check(connection_db,"hb_common") ==False):
- cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);")
- query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" %(process_id, source_name, current_time, state)
- _logger.info("MSHBT:Created hb_common DB and updated new values")
- cur.execute(query_value)
- if(update_flg == 1):
- query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state)
- _logger.info("MSHBT:Updated hb_common DB with new values")
- cur.execute(query_value)
- heartbeat.commit_and_close_db(connection_db)
- cur.close()
-
-def create_update_vnf_table_1(jsfile,update_db,connection_db):
+ connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ cur = connection_db.cursor()
+ if (heartbeat.db_table_creation_check(connection_db, "hb_common") == False):
+ cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);")
+ query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" % (process_id, source_name, current_time, state)
+ _logger.info("MSHBT:Created hb_common DB and updated new values")
+ cur.execute(query_value)
+ if (update_flg == 1):
+ query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" % (process_id, source_name, current_time, state)
+ _logger.info("MSHBT:Updated hb_common DB with new values")
+ cur.execute(query_value)
+ heartbeat.commit_and_close_db(connection_db)
+ cur.close()
+
+
+def create_update_vnf_table_1(jsfile, update_db, connection_db):
with open(jsfile, 'r') as outfile:
cfg = json.load(outfile)
hbcfg = cfg['heartbeat_config']
@@ -141,10 +145,10 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db):
vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
else:
cur = connection_db.cursor()
- if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False):
+ if (heartbeat.db_table_creation_check(connection_db, "vnf_table_1") == False):
cur.execute("CREATE TABLE vnf_table_1 (EVENT_NAME varchar primary key,HEARTBEAT_MISSED_COUNT integer,HEARTBEAT_INTERVAL integer,CLOSED_CONTROL_LOOP_NAME varchar,POLICY_VERSION varchar,POLICY_NAME varchar,POLICY_SCOPE varchar,TARGET_TYPE varchar,TARGET varchar, VERSION varchar,SOURCE_NAME_COUNT integer,VALIDITY_FLAG integer);")
_logger.info("MSHBT:Created vnf_table_1 table")
- if(update_db == 1):
+ if (update_db == 1):
query_value = "UPDATE vnf_table_1 SET VALIDITY_FLAG=0 where VALIDITY_FLAG=1;"
cur.execute(query_value)
_logger.info("MSHBT:Set Validity flag to zero in vnf_table_1 table")
@@ -154,7 +158,7 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db):
vnf_list = [item[0] for item in cur.fetchall()]
for vnf in (jhbcfg['vnfs']):
nfc = vnf['eventName']
- #_logger.error("MSHBT:",nfc)
+ # _logger.error("MSHBT:",nfc)
validity_flag = 1
source_name_count = 0
missed = vnf['heartbeatcountmissed']
@@ -167,62 +171,69 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db):
target = vnf['target']
version = vnf['version']
- if(nfc not in vnf_list):
- query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" %(nfc,missed,intvl,clloop,policyVersion,policyName,policyScope,target_type, target,version,source_name_count,validity_flag)
+ if (nfc not in vnf_list):
+ query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" % (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, source_name_count, validity_flag)
else:
- query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" %(missed,intvl,clloop,policyVersion,policyName,policyScope,target_type,target,version,validity_flag,nfc)
+ query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" % (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, validity_flag, nfc)
if (envPytest != 'test'):
cur.execute(query_value)
- #heartbeat.commit_and_close_db(connection_db)
+ # heartbeat.commit_and_close_db(connection_db)
if (envPytest != 'test'):
cur.close()
_logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file")
+
def hb_cbs_polling_process(pid_current):
- subprocess.call([ABSOLUTE_PATH4 , str(pid_current) ])
- sys.stdout.flush()
- _logger.info("MSHBT:Creaated CBS polling process")
- return
+ subprocess.call([ABSOLUTE_PATH4, str(pid_current)])
+ sys.stdout.flush()
+ _logger.info("MSHBT:Creaated CBS polling process")
+ return
+
+
def hb_worker_process(config_file_path):
- subprocess.call([ABSOLUTE_PATH1 , config_file_path ])
- sys.stdout.flush()
- _logger.info("MSHBT:Creaated Heartbeat worker process")
- return
-
-def db_monitoring_process(current_pid,jsfile):
- subprocess.call([ABSOLUTE_PATH2 , str(current_pid),jsfile])
- sys.stdout.flush()
- _logger.info("MSHBT:Creaated DB Monitoring process")
- return
+ subprocess.call([ABSOLUTE_PATH1, config_file_path])
+ sys.stdout.flush()
+ _logger.info("MSHBT:Creaated Heartbeat worker process")
+ return
+
+
+def db_monitoring_process(current_pid, jsfile):
+ subprocess.call([ABSOLUTE_PATH2, str(current_pid), jsfile])
+ sys.stdout.flush()
+ _logger.info("MSHBT:Creaated DB Monitoring process")
+ return
+
+
def read_hb_properties_default():
- #Read the hbproperties.yaml for postgress and CBS related data
- s=open(hb_properties_file, 'r')
- a=yaml.full_load(s)
-
- if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)):
- ip_address = a['pg_ipAddress']
- port_num = a['pg_portNum']
- user_name = a['pg_userName']
- password = a['pg_passwd']
- else:
- ip_address = os.getenv('pg_ipAddress')
- port_num = os.getenv('pg_portNum')
- user_name = os.getenv('pg_userName')
- password = os.getenv('pg_passwd')
+ # Read the hbproperties.yaml for postgress and CBS related data
+ s = open(hb_properties_file, 'r')
+ a = yaml.full_load(s)
+
+ if ((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)):
+ ip_address = a['pg_ipAddress']
+ port_num = a['pg_portNum']
+ user_name = a['pg_userName']
+ password = a['pg_passwd']
+ else:
+ ip_address = os.getenv('pg_ipAddress')
+ port_num = os.getenv('pg_portNum')
+ user_name = os.getenv('pg_userName')
+ password = os.getenv('pg_passwd')
+
+ dbName = a['pg_dbName']
+ db_name = dbName.lower()
+ cbs_polling_required = a['CBS_polling_allowed']
+ cbs_polling_interval = a['CBS_polling_interval']
+ s.close()
+ return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
- dbName = a['pg_dbName']
- db_name = dbName.lower()
- cbs_polling_required = a['CBS_polling_allowed']
- cbs_polling_interval = a['CBS_polling_interval']
- s.close()
- return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
def read_hb_properties(jsfile):
try:
with open(jsfile, 'r') as outfile:
- cfg = json.load(outfile)
+ cfg = json.load(outfile)
except(Exception) as err:
- msg = "CBS Json file load error - ",err
+ msg = "CBS Json file load error - ", err
_logger.error(msg)
return read_hb_properties_default()
@@ -235,21 +246,22 @@ def read_hb_properties(jsfile):
db_name = dbName.lower()
cbs_polling_required = str(cfg['CBS_polling_allowed'])
cbs_polling_interval = str(cfg['CBS_polling_interval'])
- consumer_id = str(cfg['consumerID'])
- group_id = str(cfg['groupID'])
+ consumer_id = str(cfg['consumerID'])
+ group_id = str(cfg['groupID'])
os.environ['consumerID'] = consumer_id
os.environ['groupID'] = group_id
if "SERVICE_NAME" in cfg:
- os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME'])
+ os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME'])
except(Exception) as err:
- msg = "CBS Json file read parameter error - ",err
+ msg = "CBS Json file read parameter error - ", err
_logger.error(msg)
return read_hb_properties_default()
return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+
def fetch_json_file():
if get_cbs_config():
- #current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json"
+ # current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json"
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
current_runtime_config_file_name = "/tmp/opt/app/miss_htbt_service/etc/config.json"
@@ -260,11 +272,11 @@ def fetch_json_file():
with open(current_runtime_config_file_name, 'w') as outfile:
json.dump(tds.c_config, outfile)
if os.getenv('pytest', "") == 'test':
- jsfile = current_runtime_config_file_name
+ jsfile = current_runtime_config_file_name
else:
- jsfile = "../etc/config.json"
- os.system('cp ../etc/download.json ../etc/config.json')
- os.remove("../etc/download.json")
+ jsfile = "../etc/config.json"
+ os.system('cp ../etc/download.json ../etc/config.json')
+ os.remove("../etc/download.json")
else:
msg = "MSHBD:CBS Config not available, using local config"
_logger.warning(msg)
@@ -277,39 +289,43 @@ def fetch_json_file():
_logger.info(msg)
return jsfile
+
def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
envPytest = os.getenv('pytest', "")
- if (envPytest != 'test'): #pragma: no cover
- if(update_db == 0):
- create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
+ if (envPytest != 'test'): # pragma: no cover
+ if (update_db == 0):
+ create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name
_logger.info(msg)
- connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
- if(update_db == 0):
- if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False):
- create_update_vnf_table_1(jsfile,update_db,connection_db)
+ if (update_db == 0):
+ if (heartbeat.db_table_creation_check(connection_db, "vnf_table_1") == False):
+ create_update_vnf_table_1(jsfile, update_db, connection_db)
else:
- create_update_vnf_table_1(jsfile,update_db,connection_db)
+ create_update_vnf_table_1(jsfile, update_db, connection_db)
heartbeat.commit_and_close_db(connection_db)
cur.close()
+
def create_process(job_list, jsfile, pid_current):
- if(len(job_list) == 0):
- p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,))
+ if (len(job_list) == 0):
+ p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,))
time.sleep(1)
- p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current,jsfile,))
+ p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,))
p1.start()
time.sleep(1)
p2.start()
job_list.append(p1)
job_list.append(p2)
- msg = "MSHBD:jobs list is",job_list
+ msg = "MSHBD:jobs list is", job_list
_logger.info(msg)
return job_list
+
_logger = get_logger.get_logger(__name__)
+
def main():
try:
subprocess.Popen([ABSOLUTE_PATH3], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
@@ -320,109 +336,110 @@ def main():
ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile)
msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
_logger.info(msg)
- if(cbs_polling_required == 'True'):
- p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,))
- p3.start()
+ if (cbs_polling_required == 'True'):
+ p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,))
+ p3.start()
update_db = 0
create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
state = "RECONFIGURATION"
update_flg = 0
- create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name)
- msg = "MSHBD:Current process id is",pid_current
+ create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
+ msg = "MSHBD:Current process id is", pid_current
_logger.info(msg)
_logger.info("MSHBD:Now be in a continuous loop")
- i=0
- while(True):
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name)
- msg = "MSHBT: hb_common values ",hbc_pid, hbc_state, hbc_srcName, hbc_time
- _logger.info(msg)
- current_time = int(round(time.time()))
- time_difference = current_time - hbc_time
- msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is",hbc_pid,hbc_srcName,hbc_state,hbc_time,current_time,time_difference
- _logger.info(msg)
- source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
- envPytest = os.getenv('pytest', "")
- if (envPytest == 'test'):
- if i == 2:
- hbc_pid = pid_current
- source_name = hbc_srcName
- hbc_state = "RECONFIGURATION"
- elif (i>3):
- hbc_pid = pid_current
- source_name = hbc_srcName
- hbc_state = "RUNNING"
- if (time_difference <60):
- if((int(hbc_pid)==int(pid_current)) and (source_name==hbc_srcName)):
- msg = "MSHBD:config status is",hbc_state
+ i = 0
+ while (True):
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time
+ _logger.info(msg)
+ current_time = int(round(time.time()))
+ time_difference = current_time - hbc_time
+ msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference
+ _logger.info(msg)
+ source_name = socket.gethostname()
+ source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ if i == 2:
+ hbc_pid = pid_current
+ source_name = hbc_srcName
+ hbc_state = "RECONFIGURATION"
+ elif (i > 3):
+ hbc_pid = pid_current
+ source_name = hbc_srcName
+ hbc_state = "RUNNING"
+ if (time_difference < 60):
+ if ((int(hbc_pid) == int(pid_current)) and (source_name == hbc_srcName)):
+ msg = "MSHBD:config status is", hbc_state
+ _logger.info(msg)
+ if (hbc_state == "RUNNING"):
+ state = "RUNNING"
+ update_flg = 1
+ create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
+ elif (hbc_state == "RECONFIGURATION"):
+ _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes")
+ jsfile = fetch_json_file()
+ update_db = 1
+ create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
+ msg = "MSHBD: parameters passed to DBM and HB are %d and %s", pid_current
_logger.info(msg)
- if(hbc_state=="RUNNING"):
- state = "RUNNING"
- update_flg = 1
- create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name)
- elif(hbc_state=="RECONFIGURATION"):
- _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes")
- jsfile = fetch_json_file()
- update_db = 1
- create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
- msg = "MSHBD: parameters passed to DBM and HB are %d and %s",pid_current
- _logger.info(msg)
- job_list = create_process(job_list, jsfile, pid_current)
- state = "RUNNING"
- update_flg = 1
- create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name)
+ job_list = create_process(job_list, jsfile, pid_current)
+ state = "RUNNING"
+ update_flg = 1
+ create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
- else:
- _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping")
- if(len(job_list)>=2):
- _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE")
- else:
- jsfile = fetch_json_file()
- msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",jsfile,pid_current
- _logger.info(msg)
- job_list = create_process(job_list, jsfile, pid_current)
else:
- _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover")
- if((int(hbc_pid)!=int(pid_current))or (source_name!=hbc_srcName)):
- _logger.info("MSHBD:Initiating to become Active Instance")
- if(len(job_list)>=2):
- _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE")
- else:
- jsfile = fetch_json_file()
- msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s",jsfile,pid_current
- _logger.info(msg)
- job_list = create_process(job_list, jsfile, pid_current)
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name)
- update_flg = 1
- create_update_hb_common(update_flg, pid_current, hbc_state, user_name,password,ip_address,port_num,db_name)
+ _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping")
+ if (len(job_list) >= 2):
+ _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE")
+ else:
+ jsfile = fetch_json_file()
+ msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current
+ _logger.info(msg)
+ job_list = create_process(job_list, jsfile, pid_current)
+ else:
+ _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover")
+ if ((int(hbc_pid) != int(pid_current)) or (source_name != hbc_srcName)):
+ _logger.info("MSHBD:Initiating to become Active Instance")
+ if (len(job_list) >= 2):
+ _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE")
else:
- _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR")
- time.sleep(25)
- if os.getenv('pytest', "") == 'test':
- i = i + 1
- if(i > 5):
- _logger.info("Terminating main process for pytest")
- p3.terminate()
+ jsfile = fetch_json_file()
+ msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current
+ _logger.info(msg)
+ job_list = create_process(job_list, jsfile, pid_current)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ update_flg = 1
+ create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name)
+ else:
+ _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR")
+ time.sleep(25)
+ if os.getenv('pytest', "") == 'test':
+ i = i + 1
+ if (i > 5):
+ _logger.info("Terminating main process for pytest")
+ p3.terminate()
+ time.sleep(1)
+ p3.join()
+ if (len(job_list) > 0):
+ job_list[0].terminate()
time.sleep(1)
- p3.join()
- if(len(job_list)>0):
- job_list[0].terminate()
- time.sleep(1)
- job_list[0].join()
- job_list.remove(job_list[0])
- if(len(job_list)>0):
- job_list[0].terminate()
- time.sleep(1)
- job_list[0].join()
- job_list.remove(job_list[0])
- break
+ job_list[0].join()
+ job_list.remove(job_list[0])
+ if (len(job_list) > 0):
+ job_list[0].terminate()
+ time.sleep(1)
+ job_list[0].join()
+ job_list.remove(job_list[0])
+ break
except (Exception) as e:
- msg = "MSHBD:Exception as %s" %(str(traceback.format_exc()))
+ msg = "MSHBD:Exception as %s" % (str(traceback.format_exc()))
_logger.error(msg)
msg = "Fatal error. Could not start missing heartbeat service due to: {0}".format(e)
_logger.error(msg)
+
if __name__ == '__main__':
main()