aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/db_monitoring.py
diff options
context:
space:
mode:
Diffstat (limited to 'miss_htbt_service/db_monitoring.py')
-rw-r--r--miss_htbt_service/db_monitoring.py172
1 files changed, 90 insertions, 82 deletions
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
index df1bae7..708a2bd 100644
--- a/miss_htbt_service/db_monitoring.py
+++ b/miss_htbt_service/db_monitoring.py
@@ -35,56 +35,58 @@ import get_logger
_logger = get_logger.get_logger(__name__)
-def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target):
- msg="DBM:Time to raise Control Loop Event for Control loop typ /target type - ",CLType, target_type
+
+def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time,
+ closed_control_loop_name, version, target):
+ msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type
_logger.info(msg)
- if(CLType == "ONSET"):
+ if (CLType == "ONSET"):
_logger.info("DBM:Heartbeat not received, raising alarm event")
- if(target_type == "VNF"):
+ if (target_type == "VNF"):
json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": { "generic-vnf.vnf-name": srcName} ,
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- });
- elif(target_type == "VM"):
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"generic-vnf.vnf-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE"
+ });
+ elif (target_type == "VM"):
json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": { "vserver.vserver-name": srcName} ,
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- });
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"vserver.vserver-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE"
+ });
else:
return True
- elif(CLType == "ABATED"):
+ elif (CLType == "ABATED"):
_logger.info("DBM:Heartbeat received, clearing alarm event")
- #last_date_time = datetime.datetime.now()
- if(target_type == "VNF"):
+ # last_date_time = datetime.datetime.now()
+ if (target_type == "VNF"):
json_object = json.dumps({
"closedLoopEventClient": "DCAE_Heartbeat_MS",
"policyVersion": policy_version,
"policyName": policy_name,
"policyScope": policy_scope,
"target_type": target_type,
- "AAI": { "generic-vnf.vnf-name": srcName} ,
+ "AAI": {"generic-vnf.vnf-name": srcName},
"closedLoopAlarmStart": epoc_time,
"closedLoopEventStatus": "ABATED",
"closedLoopControlName": closed_control_loop_name,
@@ -92,15 +94,15 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_s
"target": target,
"requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
"from": "DCAE"
- });
- elif(target_type == "VM"):
+ });
+ elif (target_type == "VM"):
json_object = json.dumps({
"closedLoopEventClient": "DCAE_Heartbeat_MS",
"policyVersion": policy_version,
"policyName": policy_name,
"policyScope": policy_scope,
"target_type": target_type,
- "AAI": { "vserver.vserver-name": srcName} ,
+ "AAI": {"vserver.vserver-name": srcName},
"closedLoopAlarmStart": epoc_time,
"closedLoopEventStatus": "ABATED",
"closedLoopControlName": closed_control_loop_name,
@@ -108,54 +110,55 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_s
"target": target,
"requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
"from": "DCAE"
- });
+ });
else:
return True
else:
return True
payload = json_object
- msg="DBM: CL Json object is", json_object
+ msg = "DBM: CL Json object is", json_object
_logger.info(msg)
- #psend_url = pol_url+'DefaultGroup/1?timeout=15000'
+ # psend_url = pol_url+'DefaultGroup/1?timeout=15000'
psend_url = pol_url
- msg="DBM:",psend_url
+ msg = "DBM:", psend_url
_logger.info(msg)
- #Send response for policy on output topic
+ # Send response for policy on output topic
try:
r = requests.post(psend_url, data=payload)
- msg="DBM:",r.status_code, r.reason
+ msg = "DBM:", r.status_code, r.reason
_logger.info(msg)
ret = r.status_code
- msg="DBM:Status code for sending the control loop event is",ret
+ msg = "DBM:Status code for sending the control loop event is", ret
_logger.info(msg)
except(Exception) as err:
- msg='Message send failure : ', err
+ msg = 'Message send failure : ', err
_logger.error(msg)
return True
-def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,db_name):
- while(True):
+
+def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name):
+ while (True):
time.sleep(20)
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
- break
+ break
try:
- with open(json_file, 'r') as outfile:
- cfg = json.load(outfile)
- pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
+ with open(json_file, 'r') as outfile:
+ cfg = json.load(outfile)
+ pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
except(Exception) as err:
- msg='Json file process error : ', err
- _logger.error(msg)
- continue
+ msg = 'Json file process error : ', err
+ _logger.error(msg)
+ continue
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
source_name = socket.gethostname()
source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
- connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
- if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"):
+ if (int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING"):
_logger.info("DBM: Active DB Monitoring Instance")
db_query = "Select event_name from vnf_table_1"
cur.execute(db_query)
@@ -165,11 +168,11 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d
cur.execute(query_value)
rows = cur.fetchall()
hbc_state = rows[0][0]
- if( hbc_state == "RECONFIGURATION"):
+ if (hbc_state == "RECONFIGURATION"):
_logger.info("DBM:Waiting for hb_common state to become RUNNING")
break
- db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" %(event_name)
+ db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" % (event_name)
cur.execute(db_query)
rows = cur.fetchall()
validity_flag = rows[0][0]
@@ -183,59 +186,64 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d
target_type = rows[0][8]
target = rows[0][9]
version = rows[0][10]
- comparision_time = (heartbeat_interval*heartbeat_missed_count)*1000
- if (validity_flag ==1):
+ comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000
+ if (validity_flag == 1):
for source_name_key in range(source_name_count):
- epoc_time = int(round(time.time()*1000))
- epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1))
+ epoc_time = int(round(time.time() * 1000))
+ epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" % (event_name, (source_name_key + 1))
cur.execute(epoc_query)
row = cur.fetchall()
- if (len(row)==0):
+ if (len(row) == 0):
continue
epoc_time_sec = row[0][0]
srcName = row[0][1]
cl_flag = row[0][2]
- if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0):
- sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target)
+ if ((epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0):
+ sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope,
+ target_type, srcName, epoc_time, closed_control_loop_name, version,
+ target)
cl_flag = 1
- update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1))
+ update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1))
cur.execute(update_query)
connection_db.commit()
- elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1):
- sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target)
+ elif ((epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1):
+ sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope,
+ target_type, srcName, epoc_time, closed_control_loop_name, version,
+ target)
cl_flag = 0
- update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1))
+ update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" % (cl_flag, event_name, (source_name_key + 1))
cur.execute(update_query)
connection_db.commit()
- else: #pragma: no cover
- msg="DBM:DB Monitoring is ignored for %s since validity flag is 0" %(event_name)
+ else: # pragma: no cover
+ msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % (event_name)
_logger.info(msg)
- delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" %(event_name)
+ delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" % (event_name)
cur.execute(delete_query_table2)
- delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" %(event_name)
+ delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" % (event_name)
cur.execute(delete_query)
connection_db.commit()
"""
Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2
"""
- else: #pragma: no cover
- msg="DBM:Inactive instance or hb_common state is not RUNNING"
+ else: # pragma: no cover
+ msg = "DBM:Inactive instance or hb_common state is not RUNNING"
_logger.info(msg)
pm.commit_and_close_db(connection_db)
cur.close()
break;
+
if __name__ == "__main__":
_logger.info("DBM: DBM Process started")
current_pid = sys.argv[1]
jsfile = sys.argv[2]
ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
- msg="DBM:Parent process ID and json file name",current_pid, jsfile
+ msg = "DBM:Parent process ID and json file name", current_pid, jsfile
_logger.info(msg)
while (True):
- db_monitoring(current_pid,jsfile,user_name,password,ip_address,port_num,db_name)
+ db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name)
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
- break
+ break