diff options
Diffstat (limited to 'miss_htbt_service')
18 files changed, 1486 insertions, 616 deletions
diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py new file mode 100644 index 0000000..4212ab7 --- /dev/null +++ b/miss_htbt_service/cbs_polling.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Author Prakash Hosangady(ph553f@att.com) +# CBS Polling +# Set the hb_common table with state="RECONFIGURATION" periodically +# to get the new configuration downloaded + +import requests +import sched, datetime, time +import string +import sys +import os +import socket +import htbtworker as pm +import misshtbtd as db +import logging +import get_logger +_logger = get_logger.get_logger(__name__) + + +def pollCBS(current_pid): + + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties() + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + msg="CBSP:Main process ID in hb_common is %d",hbc_pid + _logger.info(msg) + msg="CBSP:My parent process ID is %d",current_pid + _logger.info(msg) + msg="CBSP:CBS Polling interval is %d", cbs_polling_interval + _logger.info(msg) + time.sleep(cbs_polling_interval) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + #connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + #cur = connection_db.cursor() + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) + result= True + if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"): + _logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION") + state = "RECONFIGURATION" + update_flg = 1 + db.create_update_hb_common(update_flg, hbc_pid, state, user_name,password,ip_address,port_num,db_name) + else: + _logger.info("CBSP:Inactive instance or hb_common state is not RUNNING") + return result +if __name__ == "__main__": + current_pid = sys.argv[1] + while(True): + pollCBS(current_pid) diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py index ae61881..fb99584 100755..100644 --- a/miss_htbt_service/check_health.py +++ b/miss_htbt_service/check_health.py @@ -16,6 +16,7 @@ # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. + from http.server import HTTPServer, BaseHTTPRequestHandler from urllib import parse diff --git a/miss_htbt_service/config/config.yaml b/miss_htbt_service/config/config.yaml deleted file mode 100644 index 0dcc8bf..0000000 --- a/miss_htbt_service/config/config.yaml +++ /dev/null @@ -1,16 +0,0 @@ -global: - host: localhost - message_router_url: http://msgrouter.att.com:3904 -# Missing heartbeats -# Heartbeat interval -# Input topic -# Output topic -# ClosedLoopControlName -vnfs: - vnfa: - - 3 - - 60 - - VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT - - DCAE-POLICY-HILOTCA-EVENT-OUTPUT - - ControlLoopEvent1 - diff --git a/miss_htbt_service/config/hbproperties.yaml b/miss_htbt_service/config/hbproperties.yaml new file mode 100644 index 0000000..b0806e4 --- /dev/null +++ b/miss_htbt_service/config/hbproperties.yaml @@ -0,0 +1,11 @@ +#Postgres database input +#pg_ipAddress: 127.0.0.1 +pg_ipAddress: 10.0.4.1 +pg_portNum: 5432 +pg_userName: postgres +pg_passwd: postgres +pg_dbName: hb_vnf + +#Periodic polling of CBS config download +CBS_polling_allowed: True +CBS_polling_interval: 300 diff --git a/miss_htbt_service/config_notif.py b/miss_htbt_service/config_notif.py new file mode 100644 index 0000000..242b0e9 --- /dev/null +++ b/miss_htbt_service/config_notif.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Author Prakash Hosangady (ph553f) +# Read the hb_common table +# Update the state to RECONFIGURATION and save the hb_common table + +import os +import sched, datetime, time +import string +import sys +import socket +import yaml +import psycopg2 +from pathlib import Path +import os.path as path + +hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) + +def postgres_db_open(username,password,host,port,database_name): + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + return True + connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port) + return connection + +def db_table_creation_check(connection_db,table_name): + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + return True + try: + cur = connection_db.cursor() + query_db = "select * from information_schema.tables where table_name='%s'" %(table_name) + cur.execute(query_db) + database_names = cur.fetchone() + if(database_names is not None): + if(table_name in database_names): + print("HB_Notif::Postgres has already has table -", table_name) + return True + else: + print("HB_Notif::Postgres does not have table - ", table_name) + return False + except (psycopg2.DatabaseError, e): + print('COMMON:Error %s' % e) + finally: + cur.close() + +def commit_and_close_db(connection_db): + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + return True + try: + connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.close() + return True + except(psycopg2.DatabaseError, e): + return False + +def read_hb_properties(): + #Read the hbproperties.yaml for postgress and CBS related data + s=open(hb_properties_file, 'r') + a=yaml.load(s) + if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): + ip_address = a['pg_ipAddress'] + port_num = a['pg_portNum'] + user_name = a['pg_userName'] + password = a['pg_passwd'] + else: + ip_address = os.getenv('pg_ipAddress') + port_num = os.getenv('pg_portNum') + user_name = os.getenv('pg_userName') + password = os.getenv('pg_passwd') + + dbName = a['pg_dbName'] + db_name = dbName.lower() + cbs_polling_required = a['CBS_polling_allowed'] + cbs_polling_interval = a['CBS_polling_interval'] + s.close() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + +def read_hb_common(user_name,password,ip_address,port_num,db_name): + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + hbc_pid = 10 + hbc_srcName = "srvc_name" + hbc_time = 1541234567 + hbc_state = "RUNNING" + return hbc_pid, hbc_state, hbc_srcName, hbc_time + connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;" + cur.execute(query_value) + rows = cur.fetchall() + print("HB_Notif::hb_common contents - ", rows) + hbc_pid = rows[0][0] + hbc_srcName = rows[0][1] + hbc_time = rows[0][2] + hbc_state = rows[0][3] + commit_and_close_db(connection_db) + cur.close() + return hbc_pid, hbc_state, hbc_srcName, hbc_time + +def update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name): + current_time = int(round(time.time())) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + return True + connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state) + cur.execute(query_value) + commit_and_close_db(connection_db) + cur.close() + return True + +#if __name__ == "__main__": +def config_notif_run(): + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties() + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + return True + connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + if(db_table_creation_check(connection_db,"hb_common") == False): + print("HB_Notif::ERROR::hb_common table not exists - No config download") + connection_db.close() + else: + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) + state = "RECONFIGURATION" + update_flg = 1 + ret = update_hb_common(update_flg, hbc_pid, state, user_name,password,ip_address,port_num,db_name) + if (ret == True): + print("HB_Notif::hb_common table updated with RECONFIGURATION state") + commit_and_close_db(connection_db) + return True + else: + print("HB_Notif::Failure updating hb_common table") + commit_and_close_db(connection_db) + return False + + cur.close() diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py new file mode 100644 index 0000000..6113be2 --- /dev/null +++ b/miss_htbt_service/db_monitoring.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Author Prakash Hosangady(ph553f) +# DB Monitoring +# Tracks Heartbeat messages on each of the VNFs stored in postgres DB +# and generates Missing Heartbeat signal for Policy Engine + +import requests +import math +import sched, datetime, time +import json +import string +import sys +import os +import socket +import requests +import htbtworker as pm +import misshtbtd as db +import logging +import get_logger + +_logger = get_logger.get_logger(__name__) + +def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,db_name): + while(True): + time.sleep(20) + with open(json_file, 'r') as outfile: + cfg = json.load(outfile) + pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url']) + + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + break + connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"): + _logger.info("DBM: Active DB Monitoring Instance") + db_query = "Select event_name from vnf_table_1" + cur.execute(db_query) + vnf_list = [item[0] for item in cur.fetchall()] + for event_name in vnf_list: + query_value = "SELECT current_state FROM hb_common;" + cur.execute(query_value) + rows = cur.fetchall() + hbc_state = rows[0][0] + if( hbc_state == "RECONFIGURATION"): + _logger.info("DBM:Waiting for hb_common state to become RUNNING") + break + + db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" %(event_name) + cur.execute(db_query) + rows = cur.fetchall() + validity_flag = rows[0][0] + source_name_count = rows[0][1] + heartbeat_interval = rows[0][2] + heartbeat_missed_count = rows[0][3] + closed_control_loop_name = rows[0][4] + policy_version = rows[0][5] + policy_name = rows[0][6] + policy_scope = rows[0][7] + target_type = rows[0][8] + target = rows[0][9] + version = rows[0][10] + comparision_time = (heartbeat_interval*heartbeat_missed_count)*1000 + if (validity_flag ==1): + for source_name_key in range(source_name_count): + epoc_time = int(round(time.time()*1000)) + epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1)) + cur.execute(epoc_query) + row = cur.fetchall() + if (len(row)==0): + continue + epoc_time_sec = row[0][0] + srcName = row[0][1] + cl_flag = row[0][2] + vnfName = event_name + if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0): + msg="DBM:Time to raise Control Loop Event for target type - ", target_type + _logger.info(msg) + if(target_type == "VNF"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "generic-vnf.vnf-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + elif(target_type == "VM"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "vserver.vserver-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + else: + continue + payload = json_object + msg="DBM: CL Json object is", json_object + _logger.info(msg) + #psend_url = pol_url+'DefaultGroup/1?timeout=15000' + psend_url = pol_url + msg="DBM:",psend_url + _logger.info(msg) + msg="DBM:DB monitoring raising alarm event "+psend_url + _logger.info(msg) + #Send response for policy on output topic + r = requests.post(psend_url, data=payload) + msg="DBM:",r.status_code, r.reason + _logger.info(msg) + ret = r.status_code + msg="DBM:Status code after raising the control loop event is",ret + _logger.info(msg) + cl_flag = 1 + update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) + cur.execute(update_query) + connection_db.commit() + elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1): + msg="DBM:Time to clear Control Loop Event for target type - ", target_type + _logger.info(msg) + epoc_time = int(round(time.time())) + #last_date_time = datetime.datetime.now() + if(target_type == "VNF"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "generic-vnf.vnf-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + elif(target_type == "VM"): + json_object = json.dumps({ + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": { "vserver.vserver-name": srcName} , + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE" + }); + else: + continue + payload = json_object + msg="DBM: CL Json object is", json_object + _logger.info(msg) + #psend_url = pol_url+'DefaultGroup/1?timeout=15000' + psend_url = pol_url + msg="DBM:",psend_url + _logger.info(msg) + msg="DBM:Heartbeat Dead raising alarm event "+psend_url + _logger.info(msg) + #Send response for policy on output topic + r = requests.post(psend_url, data=payload) + msg="DBM:",r.status_code, r.reason + _logger.info(msg) + ret = r.status_code + msg="DBM:Status code after raising the control loop event is",ret + _logger.info(msg) + cl_flag = 0 + update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1)) + cur.execute(update_query) + connection_db.commit() + + else: + msg="DBM:DB Monitoring is ignored for %s since validity flag is 0" %(event_name) + _logger.info(msg) + + for source_name_key in range(source_name_count): + delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s' and source_name_key=%d;" %(event_name,source_name_key) + cur.execute(delete_query_table2) + delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" %(event_name) + cur.execute(delete_query) + connection_db.commit() + """ + Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2 + """ + else: + msg="DBM:Inactive instance or hb_common state is not RUNNING" + _logger.info(msg) + pm.commit_and_close_db(connection_db) + cur.close() + break; + +if __name__ == "__main__": + _logger.info("DBM: DBM Process started") + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties() + current_pid = sys.argv[1] + jsfile = sys.argv[2] + msg="DBM:Parent process ID and json file name",current_pid, jsfile + _logger.info(msg) + while (True): + db_monitoring(current_pid,jsfile,user_name,password,ip_address,port_num,db_name) diff --git a/miss_htbt_service/get_logger.py b/miss_htbt_service/get_logger.py index a18ed5c..e8d008c 100755..100644 --- a/miss_htbt_service/get_logger.py +++ b/miss_htbt_service/get_logger.py @@ -22,7 +22,8 @@ import logging '''Configures the module root logger''' root = logging.getLogger() if root.handlers: - root.handlers.clear() + #root.handlers.clear() + del root.handlers[:] formatter = logging.Formatter('%(asctime)s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(levelname)s | %(message)s') handler = logging.StreamHandler() handler.setFormatter(formatter) diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 6123386..5b62943 100755..100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2017 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,217 +13,235 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# Author Gokul Singaraju gs244f@att.com +# Author Prakash Hosangady(ph553f@att.com) # Simple Microservice # Tracks Heartbeat messages on input topic in DMaaP -# and generates Missing Heartbeat signal for Policy Engine +# and poppulate the information in postgres DB +import psycopg2 import requests -import math -import sched, datetime, time -import json -import string -import sys +import os +import json,sys,time +import misshtbtd as db +import logging +import get_logger +import os.path as path +_logger = get_logger.get_logger(__name__) -# Initialise tracking hash tables -intvl = 60 -missing_htbt = 2 -#tracks last epoch time -hearttrack = {} -#tracks sequence number -heartstate = {} -#tracks sequence number differences -heartflag = {} -#saves heartbeat message for policy -heartmsg = {} -mr_url = 'http://mrrouter.onap.org:3904' -pol_url = 'http://mrrouter.onap.org:3904' -intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT' -outopic = 'POLICY-HILOTCA-EVENT-OUTPUT' -nfc = "vVNF" -cl_loop = 'ControlLoopEvent1' -periodic_scheduler = None +def read_json_file(i): + if (i==0): + with open (path.abspath(path.join(__file__, "../../tests/test1.json")), "r") as outfile: + cfg = json.load(outfile) + elif (i == 1): + with open (path.abspath(path.join(__file__, "../../tests/test2.json")), "r") as outfile: + cfg = json.load(outfile) + elif (i ==2): + with open( path.abspath(path.join(__file__, "../../tests/test3.json")), 'r') as outfile: + cfg = json.load(outfile) + return cfg -# Checks for heartbeat event on periodic basis -class PeriodicScheduler(object): - def __init__(self): - self.scheduler = sched.scheduler(time.time, time.sleep) - - def setup(self, interval, action, actionargs=()): - #print("Args are :", locals()) - action(*actionargs) - self.scheduler.enter(interval, 1, self.setup,(interval, action, actionargs)) - def run(self): - self.scheduler.run() +def process_msg(jsfile,user_name, password, ip_address, port_num, db_name): + global mr_url + i=0 + sleep_duration = 10 + while(True): + time.sleep(sleep_duration) + with open(jsfile, 'r') as outfile: + cfg = json.load(outfile) + mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url']) - def stop(self): - list(map(self.scheduler.cancel, self.scheduler.queue)) + while(True): + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) + if(hbc_state == "RECONFIGURATION"): + _logger.info("HBT:Waiting for hb_common state to become RUNNING") + time.sleep(10) + else: + break + + if(os.getenv('pytest', "") == 'test'): + eventnameList = ["Heartbeat_vDNS","Heartbeat_vFW","Heartbeat_xx"] + else: + connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name) + cur = connection_db.cursor() + db_query = "Select event_name from vnf_table_1" + cur.execute(db_query) + eventnameList = [item[0] for item in cur.fetchall()] + msg="\n\nHBT:eventnameList values ", eventnameList + _logger.info(msg) + if "groupID" not in os.environ or "consumerID" not in os.environ: + get_url = mr_url + 'DefaultGroup/1?timeout=15000' + else: + get_url = mr_url + os.getenv('groupID') + '/' + os.getenv('consumerID') + '?timeout=15000' + msg="HBT:Getting :"+get_url + _logger.info(msg) + if(os.getenv('pytest', "") == 'test'): + jsonobj = read_json_file(i) + jobj = [] + jobj.append(jsonobj) + i=i+1 + msg="HBT:newly received test message", jobj + _logger.info(msg) + if (i >= 3): + i=0 + break + else: + res = requests.get(get_url) + msg="HBT:",res.text + _logger.info(msg) + inputString = res.text + #If mrstatus in message body indicates some information, not json msg. + if ("mrstatus" in inputString): + if (sleep_duration < 60): + sleep_duration = sleep_duration + 10 + continue + jlist = inputString.split('\n'); + # Process the DMaaP input message retreived + for line in jlist: + try: + jobj = json.loads(line) + except ValueError: + msg='HBT:Decoding JSON has failed' + _logger.error(msg) + continue + if len(jobj) == 0: + continue + for item in jobj: + try: + if(os.getenv('pytest', "") == 'test'): + jitem = jsonobj + else: + jitem = json.loads(item) + srcname = (jitem['event']['commonEventHeader']['sourceName']) + lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec']) + seqnum = (jitem['event']['commonEventHeader']['sequence']) + eventName = (jitem['event']['commonEventHeader']['eventName']) + except(Exception) as err: + msg = "HBT message process error - ",err + _logger.error(msg) + continue + msg="HBT:Newly received HB event values ::", eventName,lastepo,srcname + _logger.info(msg) + if(db_table_creation_check(connection_db,"vnf_table_2") ==False): + msg="HBT:Creating vnf_table_2" + _logger.info(msg) + cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);") + else: + msg="HBT:vnf_table_2 is already there" + _logger.info(msg) + if(eventName in eventnameList): + db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" %(eventName) + msg="HBT:",db_query + _logger.info(msg) + if(os.getenv('pytest', "") == 'test'): + break + cur.execute(db_query) + row = cur.fetchone() + source_name_count = row[0] + source_name_key = source_name_count+1 + cl_flag = 0 + if(source_name_count==0): + msg="HBT: Insert entry in table_2,source_name_count=0 : ",row + _logger.info(msg) + query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag) + cur.execute(query_value) + update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName) + cur.execute(update_query) + else: + msg="HBT:event name, source_name & source_name_count are",eventName, srcname, source_name_count + _logger.info(msg) + for source_name_key in range(source_name_count): + epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(eventName,(source_name_key+1)) + msg="HBT:eppc query is",epoc_query + _logger.info(msg) + cur.execute(epoc_query) + row = cur.fetchall() + if (len(row)==0): + continue + db_srcname = row[0][0] + if (db_srcname == srcname): + msg="HBT: Update vnf_table_2 : ",source_name_key, row + _logger.info(msg) + update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" %(lastepo,srcname,eventName,(source_name_key+1)) + cur.execute(update_query) + source_name_key = source_name_count + break + else: + continue + msg="HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count + _logger.info(msg) + if (source_name_count == (source_name_key+1)): + source_name_key = source_name_count+1 + msg="HBT: Insert entry in table_2 : ",row + _logger.info(msg) + insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag) + cur.execute(insert_query) + update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName) + cur.execute(update_query) + else: + _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") + commit_db(connection_db) + commit_and_close_db(connection_db) + if(os.getenv('pytest', "") != 'test'): + cur.close() + +def postgres_db_open(username,password,host,port,database_name): + + if(os.getenv('pytest', "") == 'test'): + return True + connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port) + return connection -# Process the heartbeat event on input topic -def periodic_event(): - global periodic_scheduler - global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop - ret = 0 - #print("Args are :", locals()) - print("{0} Checking...".format(datetime.datetime.now())) - #Read heartbeat - #get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000' - get_url = mr_url+'/DefaultGroup/1?timeout=15000' - print("Getting :"+get_url) - try: - res = requests.get(get_url) - #print(res) - #print(res.headers) - print(res.text) - #print(res.json) - inputString = res.text - #jlist = json.loads(inputString) - jlist = inputString.split('\n'); - #print("List:"+jlist[0]) - # Process the DMaaP input message retreived - for line in jlist: - print("Line:"+line) - try: - jobj = json.loads(line) - except ValueError: - print('Decoding JSON has failed') - continue - #print(jobj) - srcid = (jobj['event']['commonEventHeader']['sourceId']) - lastepo = (jobj['event']['commonEventHeader']['lastEpochMicrosec']) - seqnum = (jobj['event']['commonEventHeader']['sequence']) - nfcode = (jobj['event']['commonEventHeader']['nfNamingCode']) - if( nfcode and nfc != nfcode): - continue - if( srcid in hearttrack ): - tdiff = lastepo - hearttrack[srcid] - sdiff = seqnum - heartstate[srcid] - print("Existing source time diff :"+str(tdiff)+" seqdiff :"+str(sdiff)) - # check time difference is within limits and seq num is less than allowed - if((0 <= tdiff <= 61000000) and sdiff < missing_htbt): - print("Heartbeat Alive...") - hearttrack[srcid] = lastepo - heartstate[srcid] = seqnum; - heartflag[srcid] = sdiff; - heartmsg[srcid] = jobj; - else: - jobj["internalHeaderFields"] = json.dumps({ - "closedLoopFlag": "True", - "eventTag": "hp.Heartbeat Service.20171022.8447964515", - "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT", - "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000", - "closedLoopControlName": cl_loop, - "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000" - }); - heartmsg[srcid] = jobj; - payload = heartmsg[srcid] - print(payload) - #psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' - psend_url = pol_url+'/DefaultGroup/1?timeout=15000' - print(psend_url) - print("Heartbeat Dead raising alarm event "+psend_url) - #Send response for policy on output topic - r = requests.post(psend_url, data=payload) - print(r.status_code, r.reason) - ret = r.status_code - del heartstate[srcid] - del hearttrack[srcid] - del heartflag[srcid] - else: - print("Adding new source") - hearttrack[srcid] = lastepo - heartstate[srcid] = seqnum - heartflag[srcid] = 1 - heartmsg[srcid] = jobj; - ret = 1 - chkeys = [] - for key in heartstate.keys(): - print(key,heartstate[key]) - if( heartflag[key] == 0 ): - print("Heartbeat Dead raise alarm event"+key) - chkeys.append( key ) - #print payload - heartmsg[key]["internalHeaderFields"] = json.dumps({ - "closedLoopFlag": "True", - "eventTag": "hp.Heartbeat Service.20171022.8447964515", - "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT", - "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000", - "closedLoopControlName": cl_loop, - "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000" - }) - payload = heartmsg[key] - print(payload) - send_url = pol_url+'/DefaultGroup/1?timeout=15000' - print(send_url) - r = requests.post(send_url, data=payload) - print(r.status_code, r.reason) - ret = r.status_code - heartflag[key] = 0 - for chkey in chkeys: - print(chkey) - del heartstate[chkey] - del hearttrack[chkey] - del heartflag[chkey] - except requests.exceptions.ConnectionError: - print("Connection refused ..") - return ret +def db_table_creation_check(connection_db,table_name): + if(os.getenv('pytest', "") == 'test'): + return True + try: + cur = connection_db.cursor() + query_db = "select * from information_schema.tables where table_name='%s'" %(table_name) + cur.execute(query_db) + database_names = cur.fetchone() + if(database_names is not None): + if(table_name in database_names): + return True + else: + return False + + + except (psycopg2.DatabaseError, e): + msg = 'COMMON:Error %s' % e + _logger.error(msg) + finally: + cur.close() -#test setup for coverage -def test_setup(args): - global mr_url, pol_url, missing_htbt, intvl, intopic, outopic - missing_htbt = float(int(args[2])) - intvl = float(int(args[3])) - intopic = args[4] - outopic = args[5] - mr_url = get_collector_uri()+'/events/'+intopic - pol_url = get_policy_uri()+'/events/'+outopic - print ("Message router url %s " % mr_url) - print ("Policy url %s " % pol_url) - print ("Interval %s " % intvl) - print ("Input topic %s " % intopic) - print ("Output topic %s " % outopic) - #intvl = 60 # every second +def commit_db(connection_db): + if(os.getenv('pytest', "") == 'test'): + return True + try: + connection_db.commit() # <--- makes sure the change is shown in the database + return True + except(psycopg2.DatabaseError, e): + msg = 'COMMON:Error %s' % e + _logger.error(msg) + return False -#Main invocation -def main(args): - global periodic_scheduler - global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop - #mr_url = get_collector_uri() - #pol_url = get_policy_uri() - mr_url = args[0] - intopic = args[1] - pol_url = args[2] - outopic = args[3] - nfc = args[4] - missing_htbt = int(args[5]) - intvl = int(args[6]) - cl_loop = args[7] - print ("Message router url %s " % mr_url) - print ("Policy router url %s " % pol_url) - print ("VNF %s " % nfc) - print ("Interval %s " % intvl) - if( cl_loop != "internal_test") : - #intvl = 60 # every second - #Start periodic scheduler runs every interval - periodic_scheduler = PeriodicScheduler() - periodic_scheduler.setup(intvl, periodic_event,) # it executes the event just once - periodic_scheduler.run() # it starts the scheduler +def commit_and_close_db(connection_db): + if(os.getenv('pytest', "") == 'test'): + return True + try: + connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.close() + return True + except(psycopg2.DatabaseError, e): + msg = 'COMMON:Error %s' % e + _logger.error(msg) + return False -if __name__ == "__main__": - total = len(sys.argv) - cmdargs = str(sys.argv) - print ("The total numbers of args passed to the script: %d " % total) - print ("Missing Heartbeat Args list: %s " % cmdargs) - print ("Script name: %s" % str(sys.argv[0])) - for i in range(total): - print ("Argument # %d : %s" % (i, str(sys.argv[i]))) - main(sys.argv[1:]) - - -#force stop scheduler -def stop(): - global periodic_scheduler - if not periodic_scheduler is None: - periodic_scheduler.stop() +if __name__ == '__main__': + jsfile = sys.argv[1] + msg="HBT:HeartBeat thread Created" + _logger.info("HBT:HeartBeat thread Created") + msg="HBT:The config file name passed is -%s", jsfile + _logger.info(msg) + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties() + process_msg(jsfile,user_name, password, ip_address, port_num, db_name) diff --git a/miss_htbt_service/misshtbt.sh b/miss_htbt_service/misshtbt.sh index 5b598b1..5b598b1 100755..100644 --- a/miss_htbt_service/misshtbt.sh +++ b/miss_htbt_service/misshtbt.sh diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 02433e9..2069865 100755..100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - # ============LICENSE_START======================================================= # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ @@ -17,110 +16,406 @@ # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -# -# Author Gokul Singaraju gs244f@att.com -# - +# This is a main process that does the following +# - Creates the CBS polling process that indicates the periodic download of +# configuration file from CBS +# - Creates heartbeat worker process that receives the Heartbeat messages from VNF +# - Creates DB Monitoring process that generates Control loop event +# - Download the CBS configuration and populate the DB +# +# Author Prakash Hosangady(ph553f@att.com) +import traceback import os import sys import json +import datetime +import time +import math import multiprocessing import logging import subprocess +import yaml +import socket import get_logger from pathlib import Path - import mod.trapd_settings as tds +import htbtworker as heartbeat +import os.path as path + +hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml")) from mod.trapd_runtime_pid import save_pid, rm_pid from mod.trapd_get_cbs_config import get_cbs_config -#from mod.trapd_exit import cleanup_and_exit +from mod.trapd_exit import cleanup_and_exit from mod.trapd_http_session import init_session_obj +ip_address = "localhost" +port_num = 5432 +user_name = "postgres" +password = "postgres" +db_name = "hb_vnf" +cbs_polling_required = "true" +cbs_polling_interval = 300 +mr_url = None +pol_url = None +update_db = 0 +jsfile='empty' +import sys +ABSOLUTE_PATH1 = path.abspath(path.join(__file__, "../htbtworker.py")) +ABSOLUTE_PATH2 = path.abspath(path.join(__file__, "../db_monitoring.py")) +ABSOLUTE_PATH3 = path.abspath(path.join(__file__, "../check_health.py")) +ABSOLUTE_PATH4 = path.abspath(path.join(__file__, "../cbs_polling.py")) +def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name): + from psycopg2 import connect + import sys + try: + con = connect(user=user_name, host = ip_address, password = password) + database_name = db_name + con.autocommit = True + cur = con.cursor() + query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" %(database_name) + cur.execute(query_value) + not_exists_row = cur.fetchone() + msg = "MSHBT:Create_database:DB not exists? ", not_exists_row + _logger.info(msg) + not_exists = not_exists_row[0] + if not_exists is True: + _logger.info("MSHBT:Creating database ...") + query_value = "CREATE DATABASE %s" %(database_name) + cur.execute(query_value) + else: + _logger.info("MSHBD:Database already exists") + ''' + con = None + con = connect(user=user_name, host = ip_address, password=password) + database_name = db_name + con.autocommit = True + cur = con.cursor() + cur.execute('CREATE DATABASE %s IF NOT EXISTS %s' %(database_name,database_name)) + ''' + cur.close() + con.close() + except(Exception) as err: + msg = "MSHBD:DB Creation -",err + _logger.error(msg) -mr_url = 'http://mrrouter.onap.org:3904' -pol_url = 'http://mrrouter.onap.org:3904' -intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT' -outopic = 'POLICY-HILOTCA-EVENT-OUTPUT' +#def get_pol_and_mr_urls(jsfile, pol_url, mr_url): +# with open(jsfile, 'r') as outfile: +# cfg = json.load(outfile) +# mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url']) +# pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url']) -#Checks heartbeat by calling worker thread -def checkhtbt(mr_url, intopic, pol_url, outopic, nfc, misshtbt,intvl, cl_loop): - print('Doing some work',mr_url, misshtbt,intvl,intopic,outopic) - my_file = Path("./miss_htbt_service/htbtworker.py") - if my_file.is_file(): - subprocess.call(["python","./miss_htbt_service/htbtworker.py" , mr_url , intopic, pol_url, outopic, nfc, str(misshtbt) , str(intvl), cl_loop ]) +def read_hb_common(user_name,password,ip_address,port_num,db_name): + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + hbc_pid = 10 + hbc_srcName = "srvc_name" + hbc_time = 1584595881 + hbc_state = "RUNNING" + return hbc_pid, hbc_state, hbc_srcName, hbc_time + connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;" + cur.execute(query_value) + rows = cur.fetchall() + hbc_pid = rows[0][0] + hbc_srcName = rows[0][1] + hbc_time = rows[0][2] + hbc_state = rows[0][3] + heartbeat.commit_and_close_db(connection_db) + cur.close() + return hbc_pid, hbc_state, hbc_srcName, hbc_time + +def create_update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name): + current_time = int(round(time.time())) + source_name = socket.gethostname() + source_name = source_name + "-" + os.getenv('SERVICE_NAME') + envPytest = os.getenv('pytest', "") + if (envPytest != 'test'): + connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + if(heartbeat.db_table_creation_check(connection_db,"hb_common") ==False): + cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);") + query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" %(process_id, source_name, current_time, state) + _logger.info("MSHBT:Created hb_common DB and updated new values") + cur.execute(query_value) + if(update_flg == 1): + query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state) + _logger.info("MSHBT:Updated hb_common DB with new values") + cur.execute(query_value) + heartbeat.commit_and_close_db(connection_db) + cur.close() + +def create_update_vnf_table_1(jsfile,update_db,connection_db): + with open(jsfile, 'r') as outfile: + cfg = json.load(outfile) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] + else: + cur = connection_db.cursor() + if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False): + cur.execute("CREATE TABLE vnf_table_1 (EVENT_NAME varchar primary key,HEARTBEAT_MISSED_COUNT integer,HEARTBEAT_INTERVAL integer,CLOSED_CONTROL_LOOP_NAME varchar,POLICY_VERSION varchar,POLICY_NAME varchar,POLICY_SCOPE varchar,TARGET_TYPE varchar,TARGET varchar, VERSION varchar,SOURCE_NAME_COUNT integer,VALIDITY_FLAG integer);") + _logger.info("MSHBT:Created vnf_table_1 table") + if(update_db == 1): + query_value = "UPDATE vnf_table_1 SET VALIDITY_FLAG=0 where VALIDITY_FLAG=1;" + cur.execute(query_value) + _logger.info("MSHBT:Set Validity flag to zero in vnf_table_1 table") + # Put some initial values into the queue + db_query = "Select event_name from vnf_table_1" + cur.execute(db_query) + vnf_list = [item[0] for item in cur.fetchall()] + for vnf in (cfg['heartbeat_config']['vnfs']): + nfc = vnf['eventName'] + #_logger.error("MSHBT:",nfc) + validity_flag = 1 + source_name_count = 0 + missed = vnf['heartbeatcountmissed'] + intvl = vnf['heartbeatinterval'] + clloop = vnf['closedLoopControlName'] + policyVersion = vnf['policyVersion'] + policyName = vnf['policyName'] + policyScope = vnf['policyScope'] + target_type = vnf['target_type'] + target = vnf['target'] + version = vnf['version'] + + if(nfc not in vnf_list): + query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" %(nfc,missed,intvl,clloop,policyVersion,policyName,policyScope,target_type, target,version,source_name_count,validity_flag) else: - subprocess.call(["python","/opt/app/misshtbt/bin/htbtworker.py" , mr_url , intopic, pol_url, outopic, nfc, str(misshtbt) , str(intvl), cl_loop ]) + query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" %(missed,intvl,clloop,policyVersion,policyName,policyScope,target_type,target,version,validity_flag,nfc) + if (envPytest != 'test'): + cur.execute(query_value) + #heartbeat.commit_and_close_db(connection_db) + if (envPytest != 'test'): + cur.close() + _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file") + +def hb_cbs_polling_process(pid_current): + my_file = Path("./miss_htbt_service/cbs_polling.py") +# if my_file.is_file(): + subprocess.call(["python3.6",ABSOLUTE_PATH4 , str(pid_current) ]) +# else: +# subprocess.call(["python3.6",ABSOLUTE_PATH4 , str(pid_current) ]) sys.stdout.flush() + _logger.info("MSHBT:Creaated CBS polling process") + return +def hb_worker_process(config_file_path): + my_file = Path("./miss_htbt_service/htbtworker.py") +# if my_file.is_file(): + subprocess.call(["python3.6",ABSOLUTE_PATH1 , config_file_path ]) +# else: +# subprocess.call(["python3.6",ABSOLUTE_PATH1 , config_file_path ]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated Heartbeat worker process") return -_logger = get_logger.get_logger(__name__) +def db_monitoring_process(current_pid,jsfile): + my_file = Path("./miss_htbt_service/db_monitoring.py") +# if my_file.is_file(): + subprocess.call(["python3.6",ABSOLUTE_PATH2 , str(current_pid),jsfile]) +# else: +# subprocess.call(["python3.6",ABSOLUTE_PATH2 , str(current_pid),jsfile]) + sys.stdout.flush() + _logger.info("MSHBT:Creaated DB Monitoring process") + return -#main functon which reads yaml config and invokes heartbeat -#monitoring -if __name__ == '__main__': - try: - print("Heartbeat Microservice ...") - if "INURL" in os.environ.keys(): - mr_url = os.environ['INURL'] - if "INTOPIC" in os.environ.keys(): - intopic = os.environ['INTOPIC'] - if "OUTURL" in os.environ.keys(): - pol_url = os.environ['OUTURL'] - if "OUTOPIC" in os.environ.keys(): - outopic = os.environ['OUTOPIC'] - print(outopic) - multiprocessing.log_to_stderr() - logger = multiprocessing.get_logger() - logger.setLevel(logging.INFO) - my_env = os.environ.copy() - my_env["PYTHONPATH"] = my_env["PYTHONPATH"]+":/usr/local/lib/python3.6"+":./miss_htbt_service/" - my_env["PATH"] = my_env["PATH"]+":./bin/:./miss_htbt_service/" - p = subprocess.Popen(['check_health.py'],stdout=subprocess.PIPE,stderr=subprocess.STDOUT,env=my_env) - #print(p.communicate()) - jsfile='empty' - - # re-request config from config binding service - # (either broker, or json file override) - if get_cbs_config(): - current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" - msg = "current config logged to : %s" % current_runtime_config_file_name - logger.error(msg) - print(msg) +def read_hb_properties(): + #Read the hbproperties.yaml for postgress and CBS related data + s=open(hb_properties_file, 'r') + a=yaml.load(s) + + if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)): + ip_address = a['pg_ipAddress'] + port_num = a['pg_portNum'] + user_name = a['pg_userName'] + password = a['pg_passwd'] + else: + ip_address = os.getenv('pg_ipAddress') + port_num = os.getenv('pg_portNum') + user_name = os.getenv('pg_userName') + password = os.getenv('pg_passwd') + + dbName = a['pg_dbName'] + db_name = dbName.lower() + cbs_polling_required = a['CBS_polling_allowed'] + cbs_polling_interval = a['CBS_polling_interval'] + s.close() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + +def fetch_json_file(): + if get_cbs_config(): + #current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json" + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + current_runtime_config_file_name = "/tmp/opt/app/miss_htbt_service/etc/config.json" + else: + current_runtime_config_file_name = "../etc/download.json" + msg = "MSHBD:current config logged to : %s" % current_runtime_config_file_name + _logger.info(msg) with open(current_runtime_config_file_name, 'w') as outfile: json.dump(tds.c_config, outfile) - jsfile = current_runtime_config_file_name - else: - msg = "CBS Config not available using local config" - logger.error(msg) - print(msg) + if os.getenv('pytest', "") == 'test': + jsfile = current_runtime_config_file_name + else: + jsfile = "../etc/config.json" + os.system('cp ../etc/download.json ../etc/config.json') + os.remove("../etc/download.json") + else: + msg = "MSHBD:CBS Config not available, using local config" + _logger.warning(msg) my_file = Path("./etc/config.json") if my_file.is_file(): - jsfile = "./etc/config.json" + jsfile = "./etc/config.json" else: - jsfile = "../etc/config.json" - - print("opening %s " % jsfile) - with open(jsfile, 'r') as outfile: - cfg = json.load(outfile) - # Put some initial values into the queue - mr_url = cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url'] - pol_url = cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url'] - jobs = [] - print(cfg['heartbeat_config']) - for vnf in (cfg['heartbeat_config']['vnfs']): - print(vnf) - nfc = vnf['nfNamingCode'] - missed = vnf['heartbeatcountmissed'] - intvl = vnf['heartbeatinterval'] - clloop = vnf['closedLoopControlName'] - print('{0} {1} {2} {3}'.format(nfc,missed,intvl,clloop)) - #Start Heartbeat monitoring process worker thread on VNFs configured - logger.info("Starting threads...") - p = multiprocessing.Process(target=checkhtbt, args=( mr_url, intopic, pol_url, outopic, nfc, missed, intvl, clloop)) - jobs.append(p) - p.start() - for j in jobs: - j.join() - print('%s.exitcode = %s' % (j.name, j.exitcode)) - except Exception as e: - _logger.error("Fatal error. Could not start missing heartbeat service due to: {0}".format(e)) + jsfile = "../etc/config.json" + msg = "MSHBT: The json file is - ", jsfile + _logger.info(msg) + return jsfile + +def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name): + envPytest = os.getenv('pytest', "") + if (envPytest != 'test'): + if(update_db == 0): + create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name + _logger.info(msg) + connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name) + cur = connection_db.cursor() + if(update_db == 0): + if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False): + create_update_vnf_table_1(jsfile,update_db,connection_db) + else: + create_update_vnf_table_1(jsfile,update_db,connection_db) + heartbeat.commit_and_close_db(connection_db) + cur.close() + +def create_process(job_list, jsfile, pid_current): + if(len(job_list) == 0): + p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) + p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current,jsfile,)) + p1.start() + p2.start() + job_list.append(p1) + job_list.append(p2) + msg = "MSHBD:jobs list is",job_list + _logger.info(msg) + return job_list + +_logger = get_logger.get_logger(__name__) + +def main(): + try: + p = subprocess.Popen(['python3.6',ABSOLUTE_PATH3],stdout=subprocess.PIPE,stderr=subprocess.STDOUT) + _logger.info("MSHBD:Execution Started") + job_list = [] + pid_current = os.getpid() + ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties() + msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + _logger.info(msg) + jsfile = fetch_json_file() + if(cbs_polling_required == True): + p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,)) + p3.start() + update_db = 0 + create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + state = "RECONFIGURATION" + update_flg = 0 + create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) + msg = "MSHBD:Current process id is",pid_current + _logger.info(msg) + _logger.info("MSHBD:Now be in a continuous loop") + i=0 + while(True): + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) + msg = "MSHBT: hb_common values ",hbc_pid, hbc_state, hbc_srcName, hbc_time + _logger.info(msg) + current_time = int(round(time.time())) + time_difference = current_time - hbc_time + msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is",hbc_pid,hbc_srcName,hbc_state,hbc_time,current_time,time_difference + _logger.info(msg) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv('SERVICE_NAME')) + envPytest = os.getenv('pytest', "") + if (envPytest == 'test'): + if i == 2: + hbc_pid = pid_current + source_name = hbc_srcName + hbc_state = "RECONFIGURATION" + elif (i>3): + hbc_pid = pid_current + source_name = hbc_srcName + hbc_state = "RUNNING" + if (time_difference <60): + if((int(hbc_pid)==int(pid_current)) and (source_name==hbc_srcName)): + msg = "MSHBD:config status is",hbc_state + _logger.info(msg) + if(hbc_state=="RUNNING"): + state = "RUNNING" + update_flg = 1 + create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) + elif(hbc_state=="RECONFIGURATION"): + _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") + jsfile = fetch_json_file() + update_db = 1 + create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) + msg = "MSHBD: parameters passed to DBM and HB are %d and %s",pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + state = "RUNNING" + update_flg = 1 + create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name) + + else: + _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") + if(len(job_list)>=2): + _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") + else: + jsfile = fetch_json_file() + msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",jsfile,pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + else: + _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover") + if((int(hbc_pid)!=int(pid_current))or (source_name!=hbc_srcName)): + _logger.info("MSHBD:Initiating to become Active Instance") + if(len(job_list)>=2): + _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE") + else: + jsfile = fetch_json_file() + msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s",jsfile,pid_current + _logger.info(msg) + job_list = create_process(job_list, jsfile, pid_current) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name) + update_flg = 1 + create_update_hb_common(update_flg, pid_current, hbc_state, user_name,password,ip_address,port_num,db_name) + else: + _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") + time.sleep(25) + if os.getenv('pytest', "") == 'test': + i = i + 1 + if(i > 5): + _logger.info("Terminating main process for pytest") + p3.terminate() + time.sleep(1) + p3.join() + if(len(job_list)>0): + job_list[0].terminate() + time.sleep(1) + job_list[0].join() + job_list.remove(job_list[0]) + if(len(job_list)>0): + job_list[0].terminate() + time.sleep(1) + job_list[0].join() + job_list.remove(job_list[0]) + break + + except (Exception) as e: + msg = "MSHBD:Exception as %s" %(str(traceback.format_exc())) + _logger.error(msg) + + msg = "Fatal error. Could not start missing heartbeat service due to: {0}".format(e) + _logger.error(msg) + +if __name__ == '__main__': + main() diff --git a/miss_htbt_service/mod/__init__.py b/miss_htbt_service/mod/__init__.py index 1875bf6..1875bf6 100755..100644 --- a/miss_htbt_service/mod/__init__.py +++ b/miss_htbt_service/mod/__init__.py diff --git a/miss_htbt_service/mod/trapd_exit.py b/miss_htbt_service/mod/trapd_exit.py index 6247f4b..6247f4b 100755..100644 --- a/miss_htbt_service/mod/trapd_exit.py +++ b/miss_htbt_service/mod/trapd_exit.py diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/trapd_get_cbs_config.py index c108107..d2b615f 100755..100644 --- a/miss_htbt_service/mod/trapd_get_cbs_config.py +++ b/miss_htbt_service/mod/trapd_get_cbs_config.py @@ -33,7 +33,6 @@ import string import time import traceback import collections - import mod.trapd_settings as tds from onap_dcae_cbs_docker_client.client import get_config from mod.trapd_exit import cleanup,cleanup_and_exit @@ -92,7 +91,7 @@ def get_cbs_config(): msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + \ " (invalid json?) - FATAL ERROR, exiting" stdout_logger(msg) - cleanup_and_exit(1,None) + cleanup_and_exit(0,None) # recalc timeout, set default if not present try: diff --git a/miss_htbt_service/mod/trapd_http_session.py b/miss_htbt_service/mod/trapd_http_session.py index b34c19d..b34c19d 100755..100644 --- a/miss_htbt_service/mod/trapd_http_session.py +++ b/miss_htbt_service/mod/trapd_http_session.py diff --git a/miss_htbt_service/mod/trapd_io.py b/miss_htbt_service/mod/trapd_io.py index c89eaa3..1c40346 100755..100644 --- a/miss_htbt_service/mod/trapd_io.py +++ b/miss_htbt_service/mod/trapd_io.py @@ -36,7 +36,6 @@ import string import time import traceback import unicodedata - # dcae_snmptrap import mod.trapd_settings as tds from mod.trapd_exit import cleanup_and_exit @@ -49,327 +48,328 @@ prog_name = os.path.basename(__file__) # # # # # # # # # # ## # # # # # # # -def roll_all_logs(): - """ - roll all active logs to timestamped version, open new one - based on frequency defined in files.roll_frequency - """ - - # first roll all the eelf files - # NOTE: this will go away when onap logging is standardized/available - try: - # open various ecomp logs - if any fails, exit - for fd in [tds.eelf_error_fd, tds.eelf_debug_fd, tds.eelf_audit_fd, - tds.eelf_metrics_fd, tds.arriving_traps_fd, tds.json_traps_fd]: - fd.close() - - roll_file(tds.eelf_error_file_name) - roll_file(tds.eelf_debug_file_name) - roll_file(tds.eelf_audit_file_name) - roll_file(tds.eelf_metrics_file_name) - - except Exception as e: - msg = "Error closing logs: " + str(e) - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - reopened_successfully = open_eelf_logs() - if not reopened_successfully: - msg = "Error re-opening EELF logs during roll-over to timestamped versions - EXITING" - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - # json log - roll_file(tds.json_traps_filename) - - try: - tds.json_traps_fd = open_file(tds.json_traps_filename) - except Exception as e: - msg = ("Error opening json_log %s : %s" % - (json_traps_filename, str(e))) - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - # arriving trap log - roll_file(tds.arriving_traps_filename) - - try: - tds.arriving_traps_fd = open_file(tds.arriving_traps_filename) - except Exception as e: - msg = ("Error opening arriving traps %s : %s" % - (arriving_traps_filename, str(e))) - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - +#def roll_all_logs(): +# """ +# roll all active logs to timestamped version, open new one +# based on frequency defined in files.roll_frequency +# """ +# +# # first roll all the eelf files +# # NOTE: this will go away when onap logging is standardized/available +# try: +# # open various ecomp logs - if any fails, exit +# for fd in [tds.eelf_error_fd, tds.eelf_debug_fd, tds.eelf_audit_fd, +# tds.eelf_metrics_fd, tds.arriving_traps_fd, tds.json_traps_fd]: +# fd.close() +# +# roll_file(tds.eelf_error_file_name) +# roll_file(tds.eelf_debug_file_name) +# roll_file(tds.eelf_audit_file_name) +# roll_file(tds.eelf_metrics_file_name) +# +# except Exception as e: +# msg = "Error closing logs: " + str(e) +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# reopened_successfully = open_eelf_logs() +# if not reopened_successfully: +# msg = "Error re-opening EELF logs during roll-over to timestamped versions - EXITING" +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# # json log +# roll_file(tds.json_traps_filename) + +## try: +# tds.json_traps_fd = open_file(tds.json_traps_filename) +# except Exception as e: +# msg = ("Error opening json_log %s : %s" % +# (json_traps_filename, str(e))) +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# # arriving trap log +# roll_file(tds.arriving_traps_filename) +# +# try: +# tds.arriving_traps_fd = open_file(tds.arriving_traps_filename) +# except Exception as e: +# msg = ("Error opening arriving traps %s : %s" % +# (arriving_traps_filename, str(e))) +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# # # # # # # # # # # # # # # # # # # # # fx: setup_ecomp_logs -> log in eelf format until standard # is released for python via LOG-161 # # # # # # # # # # ## # # # # # # # -def open_eelf_logs(): - """ - open various (multiple ???) logs - """ - - try: - # open various ecomp logs - if any fails, exit - - tds.eelf_error_file_name = ( - tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_error']) - tds.eelf_error_fd = open_file(tds.eelf_error_file_name) - - except Exception as e: - msg = "Error opening eelf error log : " + str(e) - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - try: - tds.eelf_debug_file_name = ( - tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_debug']) - tds.eelf_debug_fd = open_file(tds.eelf_debug_file_name) - - except Exception as e: - msg = "Error opening eelf debug log : " + str(e) - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - try: - tds.eelf_audit_file_name = ( - tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_audit']) - tds.eelf_audit_fd = open_file(tds.eelf_audit_file_name) - except Exception as e: - msg = "Error opening eelf audit log : " + str(e) - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - try: - tds.eelf_metrics_file_name = ( - tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_metrics']) - tds.eelf_metrics_fd = open_file(tds.eelf_metrics_file_name) - except Exception as e: - msg = "Error opening eelf metric log : " + str(e) - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - return True - -# # # # # # # # # # # # # # # # # # # +#def open_eelf_logs(): +# """ +# open various (multiple ???) logs +# """ +# +# try: +# # open various ecomp logs - if any fails, exit +# +# tds.eelf_error_file_name = ( +# tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_error']) +# tds.eelf_error_fd = open_file(tds.eelf_error_file_name) +# +# except Exception as e: +# msg = "Error opening eelf error log : " + str(e) +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# try: +# tds.eelf_debug_file_name = ( +# tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_debug']) +# tds.eelf_debug_fd = open_file(tds.eelf_debug_file_name) +# +# except Exception as e: +# msg = "Error opening eelf debug log : " + str(e) +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# try: +# tds.eelf_audit_file_name = ( +# tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_audit']) +# tds.eelf_audit_fd = open_file(tds.eelf_audit_file_name) +# except Exception as e: +# msg = "Error opening eelf audit log : " + str(e) +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# try: +# tds.eelf_metrics_file_name = ( +# tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_metrics']) +# tds.eelf_metrics_fd = open_file(tds.eelf_metrics_file_name) +# except Exception as e: +# msg = "Error opening eelf metric log : " + str(e) +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# return True +# +## # # # # # # # # # # # # # # # # # # # fx: roll_log_file -> move provided filename to timestamped version # # # # # # # # # # ## # # # # # # # -def roll_file(_loc_file_name): - """ - move active file to timestamped archive - """ - - _file_name_suffix = "%s" % (datetime.datetime.fromtimestamp(time.time()). - fromtimestamp(time.time()). - strftime('%Y-%m-%dT%H:%M:%S')) - - _loc_file_name_bak = _loc_file_name + '.' + _file_name_suffix - - # roll existing file if present - if os.path.isfile(_loc_file_name): - try: - os.rename(_loc_file_name, _loc_file_name_bak) - return True - except Exception as e: - _msg = ("ERROR: Unable to rename %s to %s" - % (_loc_file_name, - _loc_file_name_bak)) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_CRIT, - tds.CODE_GENERAL, _msg) - return False - - return False - -# # # # # # # # # # # # # -# fx: open_log_file -# # # # # # # # # # # # # - - -def open_file(_loc_file_name): - """ - open _loc_file_name, return file handle - """ - - try: - # open append mode just in case so nothing is lost, but should be - # non-existent file - _loc_fd = open(_loc_file_name, 'a') - return _loc_fd - except Exception as e: - msg = "Error opening " + _loc_file_name + " append mode - " + str(e) - stdout_logger(msg) - cleanup_and_exit(1, tds.pid_file_name) - - -# # # # # # # # # # # # # -# fx: close_file -# # # # # # # # # # # # # - """ - close _loc_file_name, return True with success, False otherwise - """ - - -def close_file(_loc_fd, _loc_filename): - - try: - _loc_fd.close() - return True - except Exception as e: - msg = "Error closing %s : %s - results indeterminate" % ( - _loc_filename, str(e)) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg) - return False - -# # # # # # # # # # # # # # # # # # # -# fx: ecomp_logger -> log in eelf format until standard -# is released for python via LOG-161 -# # # # # # # # # # ## # # # # # # # - -def ecomp_logger(_log_type, _sev, _error_code, _msg): - """ - Log to ecomp-style logfiles. Logs include: - - Note: this will be updated when https://jira.onap.org/browse/LOG-161 - is closed/available; until then, we resort to a generic format with - valuable info in "extra=" field (?) - - :Parameters: - _msg - - :Exceptions: - none - :Keywords: - eelf logging - :Log Styles: - - :error.log: - - if CommonLogger.verbose: print("using CommonLogger.ErrorFile") - self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ - % (requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName, - errorCategory, errorCode, errorDescription, detailMessage)) - - error.log example: - - 2018-02-20T07:21:34,007+00:00||MainThread|snmp_log_monitor||||FATAL|900||Tue Feb 20 07:21:11 UTC 2018 CRITICAL: [a0cae74e-160e-11e8-8f9f-0242ac110002] ALL publish attempts failed to DMAPP server: dcae-mrtr-zltcrdm5bdce1.1dff83.rdm5b.tci.att.com, topic: DCAE-COLLECTOR-UCSNMP, 339 trap(s) not published in epoch_serno range: 15191112530000 - 15191112620010 - - :debug.log: - - if CommonLogger.verbose: print("using CommonLogger.DebugFile") - self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ - % (requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel, - severity, serverIPAddress, server, IPAddress, className, timer, detailMessage)) - - debug.log example: - - none available - - :audit.log: - - if CommonLogger.verbose: print("using CommonLogger.AuditFile") - endAuditTime, endAuditMsec = self._getTime() - if self._begTime is not None: - d = {'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, - 'endmsecs': endAuditMsec} - else: - d = {'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, - 'endmsecs': endAuditMsec} - - self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ - % (requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, - statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel, - severity, serverIPAddress, timer, server, IPAddress, className, unused, - processKey, customField1, customField2, customField3, customField4, - detailMessage), extra=d) - - - :metrics.log: - - self._logger.log(50,'%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ - % (requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, - targetEntity, targetServiceName, statusCode, responseCode, responseDescription, - instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, - IPAddress, - className, unused, processKey, targetVirtualEntity, customField1, customField2, - customField3, customField4, detailMessage), extra=d) - - metrics.log example: - - none available - - - """ - - unused = "" - - # above were various attempts at setting time string found in other - # libs; instead, let's keep it real: - t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3] - calling_fx = inspect.stack()[1][3] - - # DLFM: this entire module is a hack to override concept of prog logging - # written across multiple files (???), making diagnostics IMPOSSIBLE! - # Hoping to leverage ONAP logging libraries & standards when available - - # catch invalid log type - if _log_type < 1 or _log_type > 5: - msg = ("INVALID log type: %s " % _log_type) - _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s" - % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, (msg + _msg))) - try: - tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec))) - except Exception as e: - stdout_logger(str(_out_rec)) - - return False - - if _sev >= tds.minimum_severity_to_log: - # log to appropriate eelf log (different files ??) - if _log_type == tds.LOG_TYPE_ERROR: - _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' - % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) - try: - tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec))) - except Exception as e: - stdout_logger(str(_out_rec)) - elif _log_type == tds.LOG_TYPE_AUDIT: - # log message in AUDIT format - _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' - % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) - try: - tds.eelf_audit_fd.write('%s|%s\n' % (t_out, str(_out_rec))) - except Exception as e: - stdout_logger(str(_out_rec)) - elif _log_type == tds.LOG_TYPE_METRICS: - # log message in METRICS format - _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' - % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) - try: - tds.eelf_metrics_fd.write('%s|%s\n' % (t_out, str(_out_rec))) - except Exception as e: - stdout_logger(str(_out_rec)) - - # DEBUG *AND* others - there *MUST BE* a single time-sequenced log for diagnostics! - # DLFM: too much I/O !!! - # always write to debug; we need ONE logfile that has time-sequence full view !!! - # log message in DEBUG format - _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s" - % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) - try: - tds.eelf_debug_fd.write('%s|%s\n' % (t_out, str(_out_rec))) - except Exception as e: - stdout_logger(str(_out_rec)) - - return True - -# # # # # # # # # # # # # -# fx: stdout_logger -# # # # # # # # # # # # # +#def roll_file(_loc_file_name): +# """ +# move active file to timestamped archive +# """ +# +# _file_name_suffix = "%s" % (datetime.datetime.fromtimestamp(time.time()). +# fromtimestamp(time.time()). +# strftime('%Y-%m-%dT%H:%M:%S')) +# +# _loc_file_name_bak = _loc_file_name + '.' + _file_name_suffix +# +# # roll existing file if present +# if os.path.isfile(_loc_file_name): +# try: +# os.rename(_loc_file_name, _loc_file_name_bak) +# return True +# except Exception as e: +# _msg = ("ERROR: Unable to rename %s to %s" +# % (_loc_file_name, +# _loc_file_name_bak)) +# ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_CRIT, +# tds.CODE_GENERAL, _msg) +# return False +# +# return False +# +## # # # # # # # # # # # # +## fx: open_log_file +## # # # # # # # # # # # # +# +# +#def open_file(_loc_file_name): +# """ +# open _loc_file_name, return file handle +# """ +# +# try: +# # open append mode just in case so nothing is lost, but should be +# # non-existent file +# _loc_fd = open(_loc_file_name, 'a') +# return _loc_fd +# except Exception as e: +# msg = "Error opening " + _loc_file_name + " append mode - " + str(e) +# stdout_logger(msg) +# cleanup_and_exit(1, tds.pid_file_name) +# +# +## # # # # # # # # # # # # +## fx: close_file +## # # # # # # # # # # # # +# """ +# close _loc_file_name, return True with success, False otherwise +# """ +# +# +#def close_file(_loc_fd, _loc_filename): +# +# try: +# +# _loc_fd.close() +# return True +# except Exception as e: +# msg = "Error closing %s : %s - results indeterminate" % ( +# _loc_filename, str(e)) +# ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg) +# return False +# +## # # # # # # # # # # # # # # # # # # +## fx: ecomp_logger -> log in eelf format until standard +## is released for python via LOG-161 +## # # # # # # # # # ## # # # # # # # +# +#def ecomp_logger(_log_type, _sev, _error_code, _msg): +# """ +# Log to ecomp-style logfiles. Logs include: +# +# Note: this will be updated when https://jira.onap.org/browse/LOG-161 +# is closed/available; until then, we resort to a generic format with +# valuable info in "extra=" field (?) +# +# :Parameters: +# _msg - +# :Exceptions: +# none +# :Keywords: +# eelf logging +# :Log Styles: +# +# :error.log: +# +# if CommonLogger.verbose: print("using CommonLogger.ErrorFile") +# self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ +# % (requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName, +# errorCategory, errorCode, errorDescription, detailMessage)) +# +# error.log example: +# +# 2018-02-20T07:21:34,007+00:00||MainThread|snmp_log_monitor||||FATAL|900||Tue Feb 20 07:21:11 UTC 2018 CRITICAL: [a0cae74e-160e-11e8-8f9f-0242ac110002] ALL publish attempts failed to DMAPP server: dcae-mrtr-zltcrdm5bdce1.1dff83.rdm5b.tci.att.com, topic: DCAE-COLLECTOR-UCSNMP, 339 trap(s) not published in epoch_serno range: 15191112530000 - 15191112620010 +# +# :debug.log: +# +# if CommonLogger.verbose: print("using CommonLogger.DebugFile") +# self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ +# % (requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel, +# severity, serverIPAddress, server, IPAddress, className, timer, detailMessage)) +# +# debug.log example: +# +# none available +# +# :audit.log: +# +# if CommonLogger.verbose: print("using CommonLogger.AuditFile") +# endAuditTime, endAuditMsec = self._getTime() +# if self._begTime is not None: +# d = {'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime, +# 'endmsecs': endAuditMsec} +# else: +# d = {'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime, +# 'endmsecs': endAuditMsec} +# +# self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ +# % (requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, +# statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel, +# severity, serverIPAddress, timer, server, IPAddress, className, unused, +# processKey, customField1, customField2, customField3, customField4, +# detailMessage), extra=d) +# +# +# :metrics.log: +# +# self._logger.log(50,'%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \ +# % (requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName, +# targetEntity, targetServiceName, statusCode, responseCode, responseDescription, +# instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server, +# IPAddress, +# className, unused, processKey, targetVirtualEntity, customField1, customField2, +# customField3, customField4, detailMessage), extra=d) +# +# metrics.log example: +# +# none available +# +# +# """ +# +# unused = "" +# +# # above were various attempts at setting time string found in other +# # libs; instead, let's keep it real: +# t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3] +# calling_fx = inspect.stack()[1][3] +# +# # DLFM: this entire module is a hack to override concept of prog logging +# # written across multiple files (???), making diagnostics IMPOSSIBLE! +# # Hoping to leverage ONAP logging libraries & standards when available +# +# # catch invalid log type +# if _log_type < 1 or _log_type > 5: +# msg = ("INVALID log type: %s " % _log_type) +# _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s" +# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, (msg + _msg))) +# try: +# tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec))) +# except Exception as e: +# stdout_logger(str(_out_rec)) +# +# return False +# +# if _sev >= tds.minimum_severity_to_log: +# # log to appropriate eelf log (different files ??) +# if _log_type == tds.LOG_TYPE_ERROR: +# _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' +# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) +# try: +# tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec))) +# except Exception as e: +# stdout_logger(str(_out_rec)) +# elif _log_type == tds.LOG_TYPE_AUDIT: +# # log message in AUDIT format +# _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' +# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) +# try: +# tds.eelf_audit_fd.write('%s|%s\n' % (t_out, str(_out_rec))) +# except Exception as e: +# stdout_logger(str(_out_rec)) +# elif _log_type == tds.LOG_TYPE_METRICS: +# # log message in METRICS format +# _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' +# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) +# try: +# tds.eelf_metrics_fd.write('%s|%s\n' % (t_out, str(_out_rec))) +# except Exception as e: +# stdout_logger(str(_out_rec)) +# +# # DEBUG *AND* others - there *MUST BE* a single time-sequenced log for diagnostics! +# # DLFM: too much I/O !!! +# # always write to debug; we need ONE logfile that has time-sequence full view !!! +# # log message in DEBUG format +# _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s" +# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) +# try: +# tds.eelf_debug_fd.write('%s|%s\n' % (t_out, str(_out_rec))) +# except Exception as e: +# stdout_logger(str(_out_rec)) +# +# return True +# +## # # # # # # # # # # # # +## fx: stdout_logger +## # # # # # # # # # # # # def stdout_logger(_msg): diff --git a/miss_htbt_service/mod/trapd_runtime_pid.py b/miss_htbt_service/mod/trapd_runtime_pid.py index c6ef76e..c6ef76e 100755..100644 --- a/miss_htbt_service/mod/trapd_runtime_pid.py +++ b/miss_htbt_service/mod/trapd_runtime_pid.py diff --git a/miss_htbt_service/mod/trapd_settings.py b/miss_htbt_service/mod/trapd_settings.py index be87e26..be87e26 100755..100644 --- a/miss_htbt_service/mod/trapd_settings.py +++ b/miss_htbt_service/mod/trapd_settings.py diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/trapd_vnf_table.py new file mode 100644 index 0000000..a76c886 --- /dev/null +++ b/miss_htbt_service/mod/trapd_vnf_table.py @@ -0,0 +1,106 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +## Author Kiran Mandal (km386e) +""" +trapd_vnf_table verifies the successful creation of DB Tables. +""" + + +import psycopg2 +import os +import sys +import htbtworker as pm +import misshtbtd as db +import config_notif as cf +import cbs_polling as cbs +import logging +import get_logger +import yaml +import os.path as path + +prog_name = os.path.basename(__file__) +hb_properties_file = path.abspath(path.join(__file__, "../../config/hbproperties.yaml")) + +def hb_properties(): + #Read the hbproperties.yaml for postgress and CBS related data + s=open(hb_properties_file, 'r') + a=yaml.load(s) + ip_address = a['pg_ipAddress'] + port_num = a['pg_portNum'] + user_name = a['pg_userName'] + password = a['pg_passwd'] + dbName = a['pg_dbName'] + db_name = dbName.lower() + cbs_polling_required = a['CBS_polling_allowed'] + cbs_polling_interval = a['CBS_polling_interval'] + s.close() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + +ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties() + +def verify_DB_creation_1(user_name,password,ip_address,port_num,db_name): + connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + # cur = connection_db.cursor() + try: + _db_status=pm.db_table_creation_check(connection_db,"vnf_table_1") + except Exception as e: + return None + + return _db_status + +def verify_DB_creation_2(user_name,password,ip_address,port_num,db_name): + + connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + # cur = connection_db.cursor() + try: + _db_status=pm.db_table_creation_check(connection_db,"vnf_table_2") + except Exception as e: + return None + + return _db_status + +def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name): + + connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + #cur = connection_db.cursor() + try: + _db_status=pm.db_table_creation_check(connection_db,"hb_common") + except Exception as e: + return None + + return _db_status + + +def verify_cbsPolling_required(): + try: + _cbspolling_status=cf.config_notif_run() + except Exception as e: + return None + + return _cbspolling_status + +def verify_cbspolling(): + try: + _cbspolling=cbs.currentpidMain(10) + except Exception as e: + return None + + return _cbspolling |