From 20110ffeb5071193e7b437e797636d9d6318dcd4 Mon Sep 17 00:00:00 2001 From: SrikanthNaidu Date: Mon, 10 Dec 2018 21:46:40 +0530 Subject: Heartbeat Microservice Support Heartbeat service monitors missing HB notification Issue-ID: DCAEGEN2-267 Change-Id: I21f36056e9509a167bff476231a6bbd661aca1b9 Signed-off-by: SrikanthNaidu --- miss_htbt_service/misshtbtd.py | 461 +++++++++++++++++++++++++++++++++-------- 1 file changed, 378 insertions(+), 83 deletions(-) mode change 100755 => 100644 miss_htbt_service/misshtbtd.py (limited to 'miss_htbt_service/misshtbtd.py') diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py old mode 100755 new mode 100644 index 02433e9..2069865 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - # ============LICENSE_START======================================================= # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ @@ -17,110 +16,406 @@ # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -# -# Author Gokul Singaraju gs244f@att.com -# - +# This is a main process that does the following +# - Creates the CBS polling process that indicates the periodic download of +# configuration file from CBS +# - Creates heartbeat worker process that receives the Heartbeat messages from VNF +# - Creates DB Monitoring process that generates Control loop event +# - Download the CBS configuration and populate the DB +# +# Author Prakash Hosangady(ph553f@att.com) +import traceback import os import sys import json +import datetime +import time +import math import multiprocessing import logging import subprocess +import yaml +import socket import get_logger from pathlib import Path - import mod.trapd_settings as tds +import htbtworker as heartbeat +import os.path as path + +hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) from mod.trapd_runtime_pid import save_pid, rm_pid from mod.trapd_get_cbs_config import get_cbs_config -#from mod.trapd_exit import cleanup_and_exit +from mod.trapd_exit import cleanup_and_exit from mod.trapd_http_session import init_session_obj +ip_address = "localhost" +port_num = 5432 +user_name = "postgres" +password = "postgres" +db_name = "hb_vnf" +cbs_polling_required = "true" +cbs_polling_interval = 300 +mr_url = None +pol_url = None +update_db = 0 +jsfile='empty' +import sys +ABSOLUTE_PATH1 = path.abspath(path.join(__file__, "../htbtworker.py")) +ABSOLUTE_PATH2 = path.abspath(path.join(__file__, "../db_monitoring.py")) +ABSOLUTE_PATH3 = path.abspath(path.join(__file__, "../check_health.py")) +ABSOLUTE_PATH4 = path.abspath(path.join(__file__, "../cbs_polling.py")) +def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name): + from psycopg2 import connect + import sys + try: + con = connect(user=user_name, host = ip_address, password = password) + database_name = db_name + con.autocommit = True + cur = con.cursor() + query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" %(database_name) + cur.execute(query_value) + not_exists_row = cur.fetchone() + msg = "MSHBT:Create_database:DB not exists? ", not_exists_row + _logger.info(msg) + not_exists = not_exists_row[0] + if not_exists is True: + _logger.info("MSHBT:Creating database ...") + query_value = "CREATE DATABASE %s" %(database_name) + cur.execute(query_value) + else: + _logger.info("MSHBD:Database already exists") + ''' + con = None + con = connect(user=user_name, host = ip_address, password=password) + database_name = db_name + con.autocommit = True + cur = con.cursor() + cur.execute('CREATE DATABASE %s IF NOT EXISTS %s' %(database_name,database_name)) + ''' + cur.close() + con.close() + except(Exception) as err: + msg = "MSHBD:DB Creation -",err + _logger.error(msg) -mr_url = 'http://mrrouter.onap.org:3904' -pol_url = 'http://mrrouter.onap.org:3904' -intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT' -outopic = 'POLICY-HILOTCA-EVENT-OUTPUT' +#def get_pol_and_mr_urls(jsfile, pol_url, mr_url): +# with open(jsfile, 'r') as outfile: +# cfg = json.load(outfile) +# mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url']) +# pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url']) -#Checks heartbeat by calling worker thread -def checkhtbt(mr_url, intopic, pol_url, outopic, nfc, misshtbt,intvl, cl_loop): - print('Doing some work',mr_url, misshtbt,intvl,intopic,outopic) - my_file = Path("./miss_htbt_service/htbtworker.py") - if my_file.is_file(): - subprocess.call(["python","./miss_htbt_service/htbtworker.py" , mr_url , intopic, pol_url, outopic, nfc, str(misshtbt) , str(intvl), cl_loop ]) +def read_hb_common(user_name,password,ip_address,port_num,db_name): + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + hbc_pid = 10 + hbc_srcName = "srvc_name" + hbc_time = 1584595881 + hbc_state = "RUNNING" + return hbc_pid, hbc_state, hbc_srcName, hbc_time + connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;" + cur.execute(query_value) + rows = cur.fetchall() + hbc_pid = rows[0][0] + hbc_srcName = rows[0][1] + hbc_time = rows[0][2] + hbc_state = rows[0][3] + heartbeat.commit_and_close_db(connection_db) + cur.close() + return hbc_pid, hbc_state, hbc_srcName, hbc_time + +def create_update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name): + current_time = int(round(time.time())) + source_name = socket.gethostname() + source_name = source_name + "-" + os.getenv('SERVICE_NAME') + envPytest = os.getenv('pytest', "") + if (envPytest != 'test'): + connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + if(heartbeat.db_table_creation_check(connection_db,"hb_common") ==False): + cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);") + query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" %(process_id, source_name, current_time, state) + _logger.info("MSHBT:Created hb_common DB and updated new values") + cur.execute(query_value) + if(update_flg == 1): + query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state) + _logger.info("MSHBT:Updated hb_common DB with new values") + cur.execute(query_value) + heartbeat.commit_and_close_db(connection_db) + cur.close() + +def create_update_vnf_table_1(jsfile,update_db,connection_db): + with open(jsfile, 'r') as outfile: + cfg = json.load(outfile) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] + else: + cur = connection_db.cursor() + if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False): + cur.execute("CREATE TABLE vnf_table_1 (EVENT_NAME varchar primary key,HEARTBEAT_MISSED_COUNT integer,HEARTBEAT_INTERVAL integer,CLOSED_CONTROL_LOOP_NAME varchar,POLICY_VERSION varchar,POLICY_NAME varchar,POLICY_SCOPE varchar,TARGET_TYPE varchar,TARGET varchar, VERSION varchar,SOURCE_NAME_COUNT integer,VALIDITY_FLAG integer);") + _logger.info("MSHBT:Created vnf_table_1 table") + if(update_db == 1): + query_value = "UPDATE vnf_table_1 SET VALIDITY_FLAG=0 where VALIDITY_FLAG=1;" + cur.execute(query_value) + _logger.info("MSHBT:Set Validity flag to zero in vnf_table_1 table") + # Put some initial values into the queue + db_query = "Select event_name from vnf_table_1" + cur.execute(db_query) + vnf_list = [item[0] for item in cur.fetchall()] + for vnf in (cfg['heartbeat_config']['vnfs']): + nfc = vnf['eventName'] + #_logger.error("MSHBT:",nfc) + validity_flag = 1 + source_name_count = 0 + missed = vnf['heartbeatcountmissed'] + intvl = vnf['heartbeatinterval'] + clloop = vnf['closedLoopControlName'] + policyVersion = vnf['policyVersion'] + policyName = vnf['policyName'] + policyScope = vnf['policyScope'] + target_type = vnf['target_type'] + target = vnf['target'] + version = vnf['version'] + + if(nfc not in vnf_list): + query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" %(nfc,missed,intvl,clloop,policyVersion,policyName,policyScope,target_type, target,version,source_name_count,validity_flag) else: - subprocess.call(["python","/opt/app/misshtbt/bin/htbtworker.py" , mr_url , intopic, pol_url, outopic, nfc, str(misshtbt) , str(intvl), cl_loop ]) + query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" %(missed,intvl,clloop,policyVersion,policyName,policyScope,target_type,target,version,validity_flag,nfc) + if (envPytest != 'test'): + cur.execute(query_value) + #heartbeat.commit_and_close_db(connection_db) + if (envPytest != 'test'): + cur.close() + _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file") + +def hb_cbs_polling_process(pid_current): + my_file = Path("./miss_htbt_service/cbs_polling.py") +# if my_file.is_file(): + subprocess.call(["python3.6",ABSOLUTE_PATH4 , str(pid_current) ]) +# else: +# subprocess.call(["python3.6",ABSOLUTE_PATH4 , str(pid_current) ]) sys.stdout.flush() + _logger.info("MSHBT:Creaated CBS polling process") + return +def hb_worker_process(config_file_path): + my_file = Path("./miss_htbt_service/htbtworker.py") +# if my_file.is_file(): + subprocess.call(["python3.6",ABSOLUTE_PATH1 , config_file_path ]) +# else: +# subprocess.call(["python3.6",ABSOLUTE_PATH1 , config_file_path ]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated Heartbeat worker process") return -_logger = get_logger.get_logger(__name__) +def db_monitoring_process(current_pid,jsfile): + my_file = Path("./miss_htbt_service/db_monitoring.py") +# if my_file.is_file(): + subprocess.call(["python3.6",ABSOLUTE_PATH2 , str(current_pid),jsfile]) +# else: +# subprocess.call(["python3.6",ABSOLUTE_PATH2 , str(current_pid),jsfile]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated DB Monitoring process") + return -#main functon which reads yaml config and invokes heartbeat -#monitoring -if __name__ == '__main__': - try: - print("Heartbeat Microservice ...") - if "INURL" in os.environ.keys(): - mr_url = os.environ['INURL'] - if "INTOPIC" in os.environ.keys(): - intopic = os.environ['INTOPIC'] - if "OUTURL" in os.environ.keys(): - pol_url = os.environ['OUTURL'] - if "OUTOPIC" in os.environ.keys(): - outopic = os.environ['OUTOPIC'] - print(outopic) - multiprocessing.log_to_stderr() - logger = multiprocessing.get_logger() - logger.setLevel(logging.INFO) - my_env = os.environ.copy() - my_env["PYTHONPATH"] = my_env["PYTHONPATH"]+":/usr/local/lib/python3.6"+":./miss_htbt_service/" - my_env["PATH"] = my_env["PATH"]+":./bin/:./miss_htbt_service/" - p = subprocess.Popen(['check_health.py'],stdout=subprocess.PIPE,stderr=subprocess.STDOUT,env=my_env) - #print(p.communicate()) - jsfile='empty' - - # re-request config from config binding service - # (either broker, or json file override) - if get_cbs_config(): - current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" - msg = "current config logged to : %s" % current_runtime_config_file_name - logger.error(msg) - print(msg) +def read_hb_properties(): + #Read the hbproperties.yaml for postgress and CBS related data + s=open(hb_properties_file, 'r') + a=yaml.load(s) + + if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): + ip_address = a['pg_ipAddress'] + port_num = a['pg_portNum'] + user_name = a['pg_userName'] + password = a['pg_passwd'] + else: + ip_address = os.getenv('pg_ipAddress') + port_num = os.getenv('pg_portNum') + user_name = os.getenv('pg_userName') + password = os.getenv('pg_passwd') + + dbName = a['pg_dbName'] + db_name = dbName.lower() + cbs_polling_required = a['CBS_polling_allowed'] + cbs_polling_interval = a['CBS_polling_interval'] + s.close() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + +def fetch_json_file(): + if get_cbs_config(): + #current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + current_runtime_config_file_name = "/tmp/opt/app/miss_htbt_service/etc/config.json" + else: + current_runtime_config_file_name = "../etc/download.json" + msg = "MSHBD:current config logged to : %s" % current_runtime_config_file_name + _logger.info(msg) with open(current_runtime_config_file_name, 'w') as outfile: json.dump(tds.c_config, outfile) - jsfile = current_runtime_config_file_name - else: - msg = "CBS Config not available using local config" - logger.error(msg) - print(msg) + if os.getenv('pytest', "") == 'test': + jsfile = current_runtime_config_file_name + else: + jsfile = "../etc/config.json" + os.system('cp ../etc/download.json ../etc/config.json') + os.remove("../etc/download.json") + else: + msg = "MSHBD:CBS Config not available, using local config" + _logger.warning(msg) my_file = Path("./etc/config.json") if my_file.is_file(): - jsfile = "./etc/config.json" + jsfile = "./etc/config.json" else: - jsfile = "../etc/config.json" - - print("opening %s " % jsfile) - with open(jsfile, 'r') as outfile: - cfg = json.load(outfile) - # Put some initial values into the queue - mr_url = cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url'] - pol_url = cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url'] - jobs = [] - print(cfg['heartbeat_config']) - for vnf in (cfg['heartbeat_config']['vnfs']): - print(vnf) - nfc = vnf['nfNamingCode'] - missed = vnf['heartbeatcountmissed'] - intvl = vnf['heartbeatinterval'] - clloop = vnf['closedLoopControlName'] - print('{0} {1} {2} {3}'.format(nfc,missed,intvl,clloop)) - #Start Heartbeat monitoring process worker thread on VNFs configured - logger.info("Starting threads...") - p = multiprocessing.Process(target=checkhtbt, args=( mr_url, intopic, pol_url, outopic, nfc, missed, intvl, clloop)) - jobs.append(p) - p.start() - for j in jobs: - j.join() - print('%s.exitcode = %s' % (j.name, j.exitcode)) - except Exception as e: - _logger.error("Fatal error. Could not start missing heartbeat service due to: {0}".format(e)) + jsfile = "../etc/config.json" + msg = "MSHBT: The json file is - ", jsfile + _logger.info(msg) + return jsfile + +def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name): + envPytest = os.getenv('pytest', "") + if (envPytest != 'test'): + if(update_db == 0): + create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name + _logger.info(msg) + connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + if(update_db == 0): + if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False): + create_update_vnf_table_1(jsfile,update_db,connection_db) + else: + create_update_vnf_table_1(jsfile,update_db,connection_db) + heartbeat.commit_and_close_db(connection_db) + cur.close() + +def create_process(job_list, jsfile, pid_current): + if(len(job_list) == 0): + p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) + p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current,jsfile,)) + p1.start() + p2.start() + job_list.append(p1) + job_list.append(p2) + msg = "MSHBD:jobs list is",job_list + _logger.info(msg) + return job_list + +_logger = get_logger.get_logger(__name__) + +def main(): + try: + p = subprocess.Popen(['python3.6',ABSOLUTE_PATH3],stdout=subprocess.PIPE,stderr=subprocess.STDOUT) + _logger.info("MSHBD:Execution Started") + job_list = [] + pid_current = os.getpid() + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties() + msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + _logger.info(msg) + jsfile = fetch_json_file() + if(cbs_polling_required == True): + p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,)) + p3.start() + update_db = 0 + create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + state = "RECONFIGURATION" + update_flg = 0 + create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) + msg = "MSHBD:Current process id is",pid_current + _logger.info(msg) + _logger.info("MSHBD:Now be in a continuous loop") + i=0 + while(True): + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) + msg = "MSHBT: hb_common values ",hbc_pid, hbc_state, hbc_srcName, hbc_time + _logger.info(msg) + current_time = int(round(time.time())) + time_difference = current_time - hbc_time + msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is",hbc_pid,hbc_srcName,hbc_state,hbc_time,current_time,time_difference + _logger.info(msg) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + if i == 2: + hbc_pid = pid_current + source_name = hbc_srcName + hbc_state = "RECONFIGURATION" + elif (i>3): + hbc_pid = pid_current + source_name = hbc_srcName + hbc_state = "RUNNING" + if (time_difference <60): + if((int(hbc_pid)==int(pid_current)) and (source_name==hbc_srcName)): + msg = "MSHBD:config status is",hbc_state + _logger.info(msg) + if(hbc_state=="RUNNING"): + state = "RUNNING" + update_flg = 1 + create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) + elif(hbc_state=="RECONFIGURATION"): + _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") + jsfile = fetch_json_file() + update_db = 1 + create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + msg = "MSHBD: parameters passed to DBM and HB are %d and %s",pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + state = "RUNNING" + update_flg = 1 + create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) + + else: + _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") + if(len(job_list)>=2): + _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") + else: + jsfile = fetch_json_file() + msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",jsfile,pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + else: + _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover") + if((int(hbc_pid)!=int(pid_current))or (source_name!=hbc_srcName)): + _logger.info("MSHBD:Initiating to become Active Instance") + if(len(job_list)>=2): + _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE") + else: + jsfile = fetch_json_file() + msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s",jsfile,pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) + update_flg = 1 + create_update_hb_common(update_flg, pid_current, hbc_state, user_name,password,ip_address,port_num,db_name) + else: + _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") + time.sleep(25) + if os.getenv('pytest', "") == 'test': + i = i + 1 + if(i > 5): + _logger.info("Terminating main process for pytest") + p3.terminate() + time.sleep(1) + p3.join() + if(len(job_list)>0): + job_list[0].terminate() + time.sleep(1) + job_list[0].join() + job_list.remove(job_list[0]) + if(len(job_list)>0): + job_list[0].terminate() + time.sleep(1) + job_list[0].join() + job_list.remove(job_list[0]) + break + + except (Exception) as e: + msg = "MSHBD:Exception as %s" %(str(traceback.format_exc())) + _logger.error(msg) + + msg = "Fatal error. Could not start missing heartbeat service due to: {0}".format(e) + _logger.error(msg) + +if __name__ == '__main__': + main() -- cgit 1.2.3-korg