diff options
Diffstat (limited to 'miss_htbt_service')
-rw-r--r-- | miss_htbt_service/cbs_polling.py | 11 | ||||
-rw-r--r-- | miss_htbt_service/check_health.py | 4 | ||||
-rw-r--r-- | miss_htbt_service/config/hbproperties.yaml | 2 | ||||
-rw-r--r-- | miss_htbt_service/config_notif.py | 56 | ||||
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 235 | ||||
-rw-r--r-- | miss_htbt_service/htbtworker.py | 11 | ||||
-rw-r--r-- | miss_htbt_service/misshtbtd.py | 59 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_get_cbs_config.py | 10 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_vnf_table.py | 135 |
9 files changed, 368 insertions, 155 deletions
diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py index 4212ab7..233cdb5 100644 --- a/miss_htbt_service/cbs_polling.py +++ b/miss_htbt_service/cbs_polling.py @@ -32,8 +32,8 @@ _logger = get_logger.get_logger(__name__) def pollCBS(current_pid): - - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties() + jsfile = db.fetch_json_file() + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) msg="CBSP:Main process ID in hb_common is %d",hbc_pid _logger.info(msg) @@ -41,12 +41,15 @@ def pollCBS(current_pid): _logger.info(msg) msg="CBSP:CBS Polling interval is %d", cbs_polling_interval _logger.info(msg) - time.sleep(cbs_polling_interval) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + cbs_polling_interval = "30" + time.sleep(int(cbs_polling_interval)) hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) #connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) #cur = connection_db.cursor() source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) result= True if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"): _logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION") diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py index fb99584..5732749 100644 --- a/miss_htbt_service/check_health.py +++ b/miss_htbt_service/check_health.py @@ -62,6 +62,6 @@ class GetHandler(BaseHTTPRequestHandler): if __name__ == '__main__': from http.server import HTTPServer #from BaseHTTPServer import HTTPServer - server = HTTPServer(("", 10001), GetHandler) - print('Starting server at http://localhost:10001') + server = HTTPServer(("", 10002), GetHandler) + print('Starting server at http://localhost:10002') server.serve_forever() diff --git a/miss_htbt_service/config/hbproperties.yaml b/miss_htbt_service/config/hbproperties.yaml index b0806e4..ca33cc5 100644 --- a/miss_htbt_service/config/hbproperties.yaml +++ b/miss_htbt_service/config/hbproperties.yaml @@ -4,7 +4,7 @@ pg_ipAddress: 10.0.4.1 pg_portNum: 5432 pg_userName: postgres pg_passwd: postgres -pg_dbName: hb_vnf +pg_dbName: postgres #Periodic polling of CBS config download CBS_polling_allowed: True diff --git a/miss_htbt_service/config_notif.py b/miss_htbt_service/config_notif.py index 242b0e9..3ca0ef0 100644 --- a/miss_htbt_service/config_notif.py +++ b/miss_htbt_service/config_notif.py @@ -23,9 +23,12 @@ import string import sys import socket import yaml +import json import psycopg2 from pathlib import Path import os.path as path +from mod.trapd_get_cbs_config import get_cbs_config +import mod.trapd_settings as tds hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) @@ -33,7 +36,11 @@ def postgres_db_open(username,password,host,port,database_name): envPytest = os.getenv('pytest', "") if (envPytest == 'test'): return True - connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port) + try: + connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port) + except Exception as e: + print("HB_Notif::postgress connect error:", e) + connection = True return connection def db_table_creation_check(connection_db,table_name): @@ -68,7 +75,7 @@ def commit_and_close_db(connection_db): except(psycopg2.DatabaseError, e): return False -def read_hb_properties(): +def read_hb_properties_default(): #Read the hbproperties.yaml for postgress and CBS related data s=open(hb_properties_file, 'r') a=yaml.load(s) @@ -90,6 +97,29 @@ def read_hb_properties(): s.close() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval +def read_hb_properties(jsfile): + try: + with open(jsfile, 'r') as outfile: + cfg = json.load(outfile) + except(Exception) as err: + print("Json file read error - %s",err) + return read_hb_properties_default() + try: + ip_address = str(cfg['pg_ipAddress']) + port_num = str(cfg['pg_portNum']) + user_name = str(cfg['pg_userName']) + password = str(cfg['pg_passwd']) + dbName = str(cfg['pg_dbName']) + db_name = dbName.lower() + cbs_polling_required = str(cfg['CBS_polling_allowed']) + cbs_polling_interval = str(cfg['CBS_polling_interval']) + if("SERVICE_NAME" in cfg.keys()): + os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) + except(Exception) as err: + print("Json file read parameter error -%s ",err) + return read_hb_properties_default() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + def read_hb_common(user_name,password,ip_address,port_num,db_name): envPytest = os.getenv('pytest', "") if (envPytest == 'test'): @@ -115,7 +145,7 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name): def update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name): current_time = int(round(time.time())) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): return True @@ -127,9 +157,27 @@ def update_hb_common(update_flg, process_id, state, user_name,password,ip_addres cur.close() return True +def fetch_json_file(): + if get_cbs_config(): + current_runtime_config_file_name = "../etc/download1.json" + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + jsfile = "../etc/config.json" + return jsfile + print("Config_N:current config logged to : %s" % current_runtime_config_file_name) + with open(current_runtime_config_file_name, 'w') as outfile: + json.dump(tds.c_config, outfile) + jsfile = current_runtime_config_file_name + else: + print("MSHBD:CBS Config not available, using local config") + jsfile = "../etc/config.json" + print("Config_N: The json file is - %s", jsfile) + return jsfile + #if __name__ == "__main__": def config_notif_run(): - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties() + jsfile = fetch_json_file() + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): return True diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index 6113be2..b435f2a 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -34,19 +34,123 @@ import get_logger _logger = get_logger.get_logger(__name__) +def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target): + msg="DBM:Time to raise Control Loop Event for Control loop typ /target type - ",CLType, target_type + _logger.info(msg) + if(CLType == "ONSET"): + _logger.info("DBM:Heartbeat not received, raising alarm event") + if(target_type == "VNF"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "generic-vnf.vnf-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + elif(target_type == "VM"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "vserver.vserver-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + else: + return True + elif(CLType == "ABATED"): + _logger.info("DBM:Heartbeat received, clearing alarm event") + #last_date_time = datetime.datetime.now() + if(target_type == "VNF"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "generic-vnf.vnf-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + elif(target_type == "VM"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "vserver.vserver-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + else: + return True + else: + return True + payload = json_object + msg="DBM: CL Json object is", json_object + _logger.info(msg) + #psend_url = pol_url+'DefaultGroup/1?timeout=15000' + psend_url = pol_url + msg="DBM:",psend_url + _logger.info(msg) + #Send response for policy on output topic + try: + r = requests.post(psend_url, data=payload) + msg="DBM:",r.status_code, r.reason + _logger.info(msg) + ret = r.status_code + msg="DBM:Status code for sending the control loop event is",ret + _logger.info(msg) + except(Exception) as err: + msg='Message send failure : ', err + _logger.error(msg) + return True + def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,db_name): while(True): time.sleep(20) - with open(json_file, 'r') as outfile: - cfg = json.load(outfile) - pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url']) - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) - source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): break + + try: + with open(json_file, 'r') as outfile: + cfg = json.load(outfile) + pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) + except(Exception) as err: + msg='Json file process error : ', err + _logger.error(msg) + + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) cur = connection_db.cursor() if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"): @@ -89,119 +193,14 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d epoc_time_sec = row[0][0] srcName = row[0][1] cl_flag = row[0][2] - vnfName = event_name if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0): - msg="DBM:Time to raise Control Loop Event for target type - ", target_type - _logger.info(msg) - if(target_type == "VNF"): - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "generic-vnf.vnf-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - elif(target_type == "VM"): - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "vserver.vserver-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - else: - continue - payload = json_object - msg="DBM: CL Json object is", json_object - _logger.info(msg) - #psend_url = pol_url+'DefaultGroup/1?timeout=15000' - psend_url = pol_url - msg="DBM:",psend_url - _logger.info(msg) - msg="DBM:DB monitoring raising alarm event "+psend_url - _logger.info(msg) - #Send response for policy on output topic - r = requests.post(psend_url, data=payload) - msg="DBM:",r.status_code, r.reason - _logger.info(msg) - ret = r.status_code - msg="DBM:Status code after raising the control loop event is",ret - _logger.info(msg) + sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) cl_flag = 1 update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) cur.execute(update_query) connection_db.commit() elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1): - msg="DBM:Time to clear Control Loop Event for target type - ", target_type - _logger.info(msg) - epoc_time = int(round(time.time())) - #last_date_time = datetime.datetime.now() - if(target_type == "VNF"): - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "generic-vnf.vnf-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ABATED", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - elif(target_type == "VM"): - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": { "vserver.vserver-name": srcName} , - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ABATED", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }); - else: - continue - payload = json_object - msg="DBM: CL Json object is", json_object - _logger.info(msg) - #psend_url = pol_url+'DefaultGroup/1?timeout=15000' - psend_url = pol_url - msg="DBM:",psend_url - _logger.info(msg) - msg="DBM:Heartbeat Dead raising alarm event "+psend_url - _logger.info(msg) - #Send response for policy on output topic - r = requests.post(psend_url, data=payload) - msg="DBM:",r.status_code, r.reason - _logger.info(msg) - ret = r.status_code - msg="DBM:Status code after raising the control loop event is",ret - _logger.info(msg) + sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) cl_flag = 0 update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) cur.execute(update_query) @@ -211,9 +210,8 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d msg="DBM:DB Monitoring is ignored for %s since validity flag is 0" %(event_name) _logger.info(msg) - for source_name_key in range(source_name_count): - delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s' and source_name_key=%d;" %(event_name,source_name_key) - cur.execute(delete_query_table2) + delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s';" %(event_name) + cur.execute(delete_query_table2) delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" %(event_name) cur.execute(delete_query) connection_db.commit() @@ -229,10 +227,13 @@ def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,d if __name__ == "__main__": _logger.info("DBM: DBM Process started") - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties() current_pid = sys.argv[1] jsfile = sys.argv[2] + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) msg="DBM:Parent process ID and json file name",current_pid, jsfile _logger.info(msg) while (True): db_monitoring(current_pid,jsfile,user_name,password,ip_address,port_num,db_name) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + break diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 5b62943..cb7465c 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -44,12 +44,12 @@ def read_json_file(i): def process_msg(jsfile,user_name, password, ip_address, port_num, db_name): global mr_url i=0 - sleep_duration = 10 + sleep_duration = 20 while(True): time.sleep(sleep_duration) with open(jsfile, 'r') as outfile: cfg = json.load(outfile) - mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url']) + mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) while(True): hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) @@ -61,6 +61,7 @@ def process_msg(jsfile,user_name, password, ip_address, port_num, db_name): if(os.getenv('pytest', "") == 'test'): eventnameList = ["Heartbeat_vDNS","Heartbeat_vFW","Heartbeat_xx"] + connection_db = 0 else: connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() @@ -72,7 +73,7 @@ def process_msg(jsfile,user_name, password, ip_address, port_num, db_name): if "groupID" not in os.environ or "consumerID" not in os.environ: get_url = mr_url + 'DefaultGroup/1?timeout=15000' else: - get_url = mr_url + os.getenv('groupID') + '/' + os.getenv('consumerID') + '?timeout=15000' + get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000' msg="HBT:Getting :"+get_url _logger.info(msg) @@ -93,8 +94,6 @@ def process_msg(jsfile,user_name, password, ip_address, port_num, db_name): inputString = res.text #If mrstatus in message body indicates some information, not json msg. if ("mrstatus" in inputString): - if (sleep_duration < 60): - sleep_duration = sleep_duration + 10 continue jlist = inputString.split('\n'); # Process the DMaaP input message retreived @@ -243,5 +242,5 @@ if __name__ == '__main__': _logger.info("HBT:HeartBeat thread Created") msg="HBT:The config file name passed is -%s", jsfile _logger.info(msg) - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties() + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties(jsfile) process_msg(jsfile,user_name, password, ip_address, port_num, db_name) diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 2069865..36683dc 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -101,8 +101,8 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password #def get_pol_and_mr_urls(jsfile, pol_url, mr_url): # with open(jsfile, 'r') as outfile: # cfg = json.load(outfile) -# mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url']) -# pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url']) +# mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) +# pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) def read_hb_common(user_name,password,ip_address,port_num,db_name): envPytest = os.getenv('pytest', "") @@ -128,7 +128,7 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name): def create_update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name): current_time = int(round(time.time())) source_name = socket.gethostname() - source_name = source_name + "-" + os.getenv('SERVICE_NAME') + source_name = source_name + "-" + os.getenv('SERVICE_NAME', "") envPytest = os.getenv('pytest', "") if (envPytest != 'test'): connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) @@ -148,6 +148,8 @@ def create_update_hb_common(update_flg, process_id, state, user_name,password,ip def create_update_vnf_table_1(jsfile,update_db,connection_db): with open(jsfile, 'r') as outfile: cfg = json.load(outfile) + hbcfg = cfg['heartbeat_config'] + jhbcfg = json.loads(hbcfg) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] @@ -164,7 +166,7 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db): db_query = "Select event_name from vnf_table_1" cur.execute(db_query) vnf_list = [item[0] for item in cur.fetchall()] - for vnf in (cfg['heartbeat_config']['vnfs']): + for vnf in (jhbcfg['vnfs']): nfc = vnf['eventName'] #_logger.error("MSHBT:",nfc) validity_flag = 1 @@ -218,17 +220,16 @@ def db_monitoring_process(current_pid,jsfile): sys.stdout.flush() _logger.info("MSHBT:Creaated DB Monitoring process") return - -def read_hb_properties(): +def read_hb_properties_default(): #Read the hbproperties.yaml for postgress and CBS related data s=open(hb_properties_file, 'r') a=yaml.load(s) - + if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): ip_address = a['pg_ipAddress'] port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] + user_name = a['pg_userName'] + password = a['pg_passwd'] else: ip_address = os.getenv('pg_ipAddress') port_num = os.getenv('pg_portNum') @@ -242,6 +243,36 @@ def read_hb_properties(): s.close() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval +def read_hb_properties(jsfile): + try: + with open(jsfile, 'r') as outfile: + cfg = json.load(outfile) + except(Exception) as err: + msg = "CBS Json file load error - ",err + _logger.error(msg) + return read_hb_properties_default() + + try: + ip_address = str(cfg['pg_ipAddress']) + port_num = str(cfg['pg_portNum']) + user_name = str(cfg['pg_userName']) + password = str(cfg['pg_passwd']) + dbName = str(cfg['pg_dbName']) + db_name = dbName.lower() + cbs_polling_required = str(cfg['CBS_polling_allowed']) + cbs_polling_interval = str(cfg['CBS_polling_interval']) + consumer_id = str(cfg['consumerID']) + group_id = str(cfg['groupID']) + os.environ['consumerID'] = consumer_id + os.environ['groupID'] = group_id + if("SERVICE_NAME" in cfg.keys()): + os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) + except(Exception) as err: + msg = "CBS Json file read parameter error - ",err + _logger.error(msg) + return read_hb_properties_default() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + def fetch_json_file(): if get_cbs_config(): #current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" @@ -292,8 +323,10 @@ def create_update_db(update_db, jsfile, ip_address, port_num, user_name, passwor def create_process(job_list, jsfile, pid_current): if(len(job_list) == 0): p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) + time.sleep(1) p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current,jsfile,)) p1.start() + time.sleep(1) p2.start() job_list.append(p1) job_list.append(p2) @@ -309,11 +342,11 @@ def main(): _logger.info("MSHBD:Execution Started") job_list = [] pid_current = os.getpid() - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties() + jsfile = fetch_json_file() + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile) msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval _logger.info(msg) - jsfile = fetch_json_file() - if(cbs_polling_required == True): + if(cbs_polling_required == 'True'): p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,)) p3.start() update_db = 0 @@ -334,7 +367,7 @@ def main(): msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is",hbc_pid,hbc_srcName,hbc_state,hbc_time,current_time,time_difference _logger.info(msg) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) envPytest = os.getenv('pytest', "") if (envPytest == 'test'): if i == 2: diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/trapd_get_cbs_config.py index d2b615f..47ba223 100644 --- a/miss_htbt_service/mod/trapd_get_cbs_config.py +++ b/miss_htbt_service/mod/trapd_get_cbs_config.py @@ -58,9 +58,9 @@ def get_cbs_config(): # See if we are in a config binding service (CBS) /controller environment try: + msg = "Unable to fetch CBS config or it is erroneously empty - trying override/simulator config" tds.c_config = get_config() if tds.c_config == {}: - msg = "Unable to fetch CBS config or it is erroneously empty - trying override/simulator config" stdout_logger(msg) # if no CBS present, default to JSON config specified via CBS_HTBT_JSON env var @@ -69,15 +69,15 @@ def get_cbs_config(): stdout_logger(msg) try: + msg = "CBS_HTBT_JSON not defined - FATAL ERROR, exiting" _cbs_sim_json_file = os.getenv("CBS_HTBT_JSON", "None") except Exception as e: - msg = "CBS_HTBT_JSON not defined - FATAL ERROR, exiting" stdout_logger(msg) cleanup(1,None) return False + msg = "CBS_HTBT_JSON not defined - FATAL ERROR, exiting" if _cbs_sim_json_file == "None": - msg = "CBS_HTBT_JSON not defined - FATAL ERROR, exiting" stdout_logger(msg) cleanup(1,None) return False @@ -85,11 +85,11 @@ def get_cbs_config(): msg = ("ONAP controller override specified via CBS_HTBT_JSON: %s" % _cbs_sim_json_file) stdout_logger(msg) + msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + \ + " (invalid json?) - FATAL ERROR, exiting" try: tds.c_config = json.load(open(_cbs_sim_json_file)) except Exception as e: - msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + \ - " (invalid json?) - FATAL ERROR, exiting" stdout_logger(msg) cleanup_and_exit(0,None) diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/trapd_vnf_table.py index a76c886..b180bf5 100644 --- a/miss_htbt_service/mod/trapd_vnf_table.py +++ b/miss_htbt_service/mod/trapd_vnf_table.py @@ -35,9 +35,16 @@ import logging import get_logger import yaml import os.path as path +import db_monitoring as dbmon +import json +from onap_dcae_cbs_docker_client.client import get_config +import unittest +import time +import subprocess prog_name = os.path.basename(__file__) hb_properties_file = path.abspath(path.join(__file__, "../../config/hbproperties.yaml")) +_logger = get_logger.get_logger(__name__) def hb_properties(): #Read the hbproperties.yaml for postgress and CBS related data @@ -54,7 +61,6 @@ def hb_properties(): s.close() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval -ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties() def verify_DB_creation_1(user_name,password,ip_address,port_num,db_name): connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) @@ -90,17 +96,140 @@ def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name) def verify_cbsPolling_required(): + _cbspolling_status = True + os.environ['pytest']='test' + #os.environ['CONSUL_HOST']='10.12.6.50' # Used this IP during testing + os.environ['CONSUL_HOST']='localhost' + os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' try: _cbspolling_status=cf.config_notif_run() except Exception as e: - return None + print("Config_notify error - ",e) + #return None + os.unsetenv('pytest') + os.unsetenv('CONSUL_HOST') + os.unsetenv('SERVICE_NAME') return _cbspolling_status def verify_cbspolling(): + os.environ['pytest']='test' + os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' try: - _cbspolling=cbs.currentpidMain(10) + _cbspolling=cbs.pollCBS(10) except Exception as e: + #print("CBSP error - ",e) return None + os.unsetenv('pytest') + os.unsetenv('SERVICE_NAME') return _cbspolling + +def verify_fetch_json_file(): + os.environ['pytest']='test' + os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' + #os.environ['CONSUL_HOST']='10.12.6.50' # Used this IP during testing + os.environ['CONSUL_HOST']='localhost' + os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + try: + db.fetch_json_file() + result = True + except Exception as e: + result = False + print(result) + os.unsetenv('pytest') + os.unsetenv('SERVICE_NAME') + os.unsetenv('CONSUL_HOST') + os.unsetenv('HOSTNAME') + return result + +def verify_misshtbtdmain(): + os.environ['pytest']='test' + os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' + #os.environ['CONSUL_HOST']='10.12.6.50' + os.environ['CONSUL_HOST']='localhost' + os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + + try: + db.main() + result = True + except Exception as e: + result = False + print(result) + os.unsetenv('pytest') + os.unsetenv('SERVICE_NAME') + os.unsetenv('CONSUL_HOST') + os.unsetenv('HOSTNAME') + return result + +def verify_dbmonitoring(): + os.environ['pytest']='test' + os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' + #os.environ['CONSUL_HOST']='10.12.6.50' + os.environ['CONSUL_HOST']='localhost' + os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + try: + jsfile = db.fetch_json_file() + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties() + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + dbmon.db_monitoring(hbc_pid,jsfile,user_name,password,ip_address,port_num,db_name) + result = True + except Exception as e: + print("Message process error - ",e) + result = False + print(result) + os.unsetenv('pytest') + os.unsetenv('SERVICE_NAME') + os.unsetenv('CONSUL_HOST') + os.unsetenv('HOSTNAME') + return result + +def verify_dbmon_startup(): + try: + p = subprocess.Popen(['./miss_htbt_service/db_monitoring.py'], stdout=subprocess.PIPE,shell=True) + time.sleep(1) + except Exception as e: + #print( "Message process error - ",e) + return None + return True + +def verify_sendControlLoop_VNF_ONSET(): + try: +# _CL_return = sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, closed_control_loop_name, version, target) + pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" + _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + except Exception as e: + #msg = "Message process error - ",err + #_logger.error(msg) + return None + return _CL_return + +def verify_sendControlLoop_VM_ONSET(): + try: + pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" + _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + except Exception as e: + #msg = "Message process error - ",err + #_logger.error(msg) + return None + return _CL_return + +def verify_sendControlLoop_VNF_ABATED(): + try: + pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" + _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + except Exception as e: + #msg = "Message process error - ",err + #_logger.error(msg) + return None + return _CL_return + +def verify_sendControlLoop_VM_ABATED(): + try: + pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" + _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + except Exception as e: +# msg = "Message process error - ",err +# _logger.error(msg) + return None + return _CL_return |