aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/misshtbtd.py
diff options
context:
space:
mode:
authorPrakashH <pbhandar@techmahindra.com>2019-01-28 20:49:02 +0000
committerPrakashH <pbhandar@techmahindra.com>2019-01-28 20:49:02 +0000
commita86243058c2daa560aebaecdb096ff63788a6f44 (patch)
tree68d29b8ec0545792d50785b0105914d00c73d884 /miss_htbt_service/misshtbtd.py
parent20110ffeb5071193e7b437e797636d9d6318dcd4 (diff)
Heartbeat Microservice Support
Heartbeat service monitors missing HB notification Issue-ID: DCAEGEN2-267 Change-Id: I0fd191b2a3495202e22f633ada4a1350a97557ad Signed-off-by: PrakashH <pbhandar@techmahindra.com>
Diffstat (limited to 'miss_htbt_service/misshtbtd.py')
-rw-r--r--miss_htbt_service/misshtbtd.py59
1 files changed, 46 insertions, 13 deletions
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py
index 2069865..36683dc 100644
--- a/miss_htbt_service/misshtbtd.py
+++ b/miss_htbt_service/misshtbtd.py
@@ -101,8 +101,8 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password
#def get_pol_and_mr_urls(jsfile, pol_url, mr_url):
# with open(jsfile, 'r') as outfile:
# cfg = json.load(outfile)
-# mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url'])
-# pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url'])
+# mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url'])
+# pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
def read_hb_common(user_name,password,ip_address,port_num,db_name):
envPytest = os.getenv('pytest', "")
@@ -128,7 +128,7 @@ def read_hb_common(user_name,password,ip_address,port_num,db_name):
def create_update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name):
current_time = int(round(time.time()))
source_name = socket.gethostname()
- source_name = source_name + "-" + os.getenv('SERVICE_NAME')
+ source_name = source_name + "-" + os.getenv('SERVICE_NAME', "")
envPytest = os.getenv('pytest', "")
if (envPytest != 'test'):
connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name)
@@ -148,6 +148,8 @@ def create_update_hb_common(update_flg, process_id, state, user_name,password,ip
def create_update_vnf_table_1(jsfile,update_db,connection_db):
with open(jsfile, 'r') as outfile:
cfg = json.load(outfile)
+ hbcfg = cfg['heartbeat_config']
+ jhbcfg = json.loads(hbcfg)
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
@@ -164,7 +166,7 @@ def create_update_vnf_table_1(jsfile,update_db,connection_db):
db_query = "Select event_name from vnf_table_1"
cur.execute(db_query)
vnf_list = [item[0] for item in cur.fetchall()]
- for vnf in (cfg['heartbeat_config']['vnfs']):
+ for vnf in (jhbcfg['vnfs']):
nfc = vnf['eventName']
#_logger.error("MSHBT:",nfc)
validity_flag = 1
@@ -218,17 +220,16 @@ def db_monitoring_process(current_pid,jsfile):
sys.stdout.flush()
_logger.info("MSHBT:Creaated DB Monitoring process")
return
-
-def read_hb_properties():
+def read_hb_properties_default():
#Read the hbproperties.yaml for postgress and CBS related data
s=open(hb_properties_file, 'r')
a=yaml.load(s)
-
+
if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)):
ip_address = a['pg_ipAddress']
port_num = a['pg_portNum']
- user_name = a['pg_userName']
- password = a['pg_passwd']
+ user_name = a['pg_userName']
+ password = a['pg_passwd']
else:
ip_address = os.getenv('pg_ipAddress')
port_num = os.getenv('pg_portNum')
@@ -242,6 +243,36 @@ def read_hb_properties():
s.close()
return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+def read_hb_properties(jsfile):
+ try:
+ with open(jsfile, 'r') as outfile:
+ cfg = json.load(outfile)
+ except(Exception) as err:
+ msg = "CBS Json file load error - ",err
+ _logger.error(msg)
+ return read_hb_properties_default()
+
+ try:
+ ip_address = str(cfg['pg_ipAddress'])
+ port_num = str(cfg['pg_portNum'])
+ user_name = str(cfg['pg_userName'])
+ password = str(cfg['pg_passwd'])
+ dbName = str(cfg['pg_dbName'])
+ db_name = dbName.lower()
+ cbs_polling_required = str(cfg['CBS_polling_allowed'])
+ cbs_polling_interval = str(cfg['CBS_polling_interval'])
+ consumer_id = str(cfg['consumerID'])
+ group_id = str(cfg['groupID'])
+ os.environ['consumerID'] = consumer_id
+ os.environ['groupID'] = group_id
+ if("SERVICE_NAME" in cfg.keys()):
+ os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME'])
+ except(Exception) as err:
+ msg = "CBS Json file read parameter error - ",err
+ _logger.error(msg)
+ return read_hb_properties_default()
+ return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+
def fetch_json_file():
if get_cbs_config():
#current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json"
@@ -292,8 +323,10 @@ def create_update_db(update_db, jsfile, ip_address, port_num, user_name, passwor
def create_process(job_list, jsfile, pid_current):
if(len(job_list) == 0):
p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,))
+ time.sleep(1)
p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current,jsfile,))
p1.start()
+ time.sleep(1)
p2.start()
job_list.append(p1)
job_list.append(p2)
@@ -309,11 +342,11 @@ def main():
_logger.info("MSHBD:Execution Started")
job_list = []
pid_current = os.getpid()
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties()
+ jsfile = fetch_json_file()
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile)
msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
_logger.info(msg)
- jsfile = fetch_json_file()
- if(cbs_polling_required == True):
+ if(cbs_polling_required == 'True'):
p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,))
p3.start()
update_db = 0
@@ -334,7 +367,7 @@ def main():
msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is",hbc_pid,hbc_srcName,hbc_state,hbc_time,current_time,time_difference
_logger.info(msg)
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME'))
+ source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
envPytest = os.getenv('pytest', "")
if (envPytest == 'test'):
if i == 2: