aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service
diff options
context:
space:
mode:
authorSrikanthNaidu <sn8492@att.com>2018-12-10 21:46:40 +0530
committerSrikanthNaidu <sn8492@att.com>2018-12-12 18:45:52 +0530
commit20110ffeb5071193e7b437e797636d9d6318dcd4 (patch)
treeebd4e64715f4cafc7261b2eff9990e4faa53d050 /miss_htbt_service
parent5712f01385eaa05178279aa7e730b3bdde3d1b80 (diff)
Heartbeat Microservice Support
Heartbeat service monitors missing HB notification Issue-ID: DCAEGEN2-267 Change-Id: I21f36056e9509a167bff476231a6bbd661aca1b9 Signed-off-by: SrikanthNaidu <sn8492@att.com>
Diffstat (limited to 'miss_htbt_service')
-rw-r--r--miss_htbt_service/cbs_polling.py62
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/check_health.py1
-rw-r--r--miss_htbt_service/config/config.yaml16
-rw-r--r--miss_htbt_service/config/hbproperties.yaml11
-rw-r--r--miss_htbt_service/config_notif.py155
-rw-r--r--miss_htbt_service/db_monitoring.py238
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/get_logger.py3
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/htbtworker.py422
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/misshtbt.sh0
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/misshtbtd.py461
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/mod/__init__.py0
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/mod/trapd_exit.py0
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/mod/trapd_get_cbs_config.py3
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/mod/trapd_http_session.py0
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/mod/trapd_io.py624
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/mod/trapd_runtime_pid.py0
-rw-r--r--[-rwxr-xr-x]miss_htbt_service/mod/trapd_settings.py0
-rw-r--r--miss_htbt_service/mod/trapd_vnf_table.py106
18 files changed, 1486 insertions, 616 deletions
diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py
new file mode 100644
index 0000000..4212ab7
--- /dev/null
+++ b/miss_htbt_service/cbs_polling.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python3
+# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Author Prakash Hosangady(ph553f@att.com)
+# CBS Polling
+# Set the hb_common table with state="RECONFIGURATION" periodically
+# to get the new configuration downloaded
+
+import requests
+import sched, datetime, time
+import string
+import sys
+import os
+import socket
+import htbtworker as pm
+import misshtbtd as db
+import logging
+import get_logger
+_logger = get_logger.get_logger(__name__)
+
+
+def pollCBS(current_pid):
+
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties()
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
+ msg="CBSP:Main process ID in hb_common is %d",hbc_pid
+ _logger.info(msg)
+ msg="CBSP:My parent process ID is %d",current_pid
+ _logger.info(msg)
+ msg="CBSP:CBS Polling interval is %d", cbs_polling_interval
+ _logger.info(msg)
+ time.sleep(cbs_polling_interval)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
+ #connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ #cur = connection_db.cursor()
+ source_name = socket.gethostname()
+ source_name = source_name + "-" + str(os.getenv('SERVICE_NAME'))
+ result= True
+ if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"):
+ _logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION")
+ state = "RECONFIGURATION"
+ update_flg = 1
+ db.create_update_hb_common(update_flg, hbc_pid, state, user_name,password,ip_address,port_num,db_name)
+ else:
+ _logger.info("CBSP:Inactive instance or hb_common state is not RUNNING")
+ return result
+if __name__ == "__main__":
+ current_pid = sys.argv[1]
+ while(True):
+ pollCBS(current_pid)
diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py
index ae61881..fb99584 100755..100644
--- a/miss_htbt_service/check_health.py
+++ b/miss_htbt_service/check_health.py
@@ -16,6 +16,7 @@
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib import parse
diff --git a/miss_htbt_service/config/config.yaml b/miss_htbt_service/config/config.yaml
deleted file mode 100644
index 0dcc8bf..0000000
--- a/miss_htbt_service/config/config.yaml
+++ /dev/null
@@ -1,16 +0,0 @@
-global:
- host: localhost
- message_router_url: http://msgrouter.att.com:3904
-# Missing heartbeats
-# Heartbeat interval
-# Input topic
-# Output topic
-# ClosedLoopControlName
-vnfs:
- vnfa:
- - 3
- - 60
- - VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT
- - DCAE-POLICY-HILOTCA-EVENT-OUTPUT
- - ControlLoopEvent1
-
diff --git a/miss_htbt_service/config/hbproperties.yaml b/miss_htbt_service/config/hbproperties.yaml
new file mode 100644
index 0000000..b0806e4
--- /dev/null
+++ b/miss_htbt_service/config/hbproperties.yaml
@@ -0,0 +1,11 @@
+#Postgres database input
+#pg_ipAddress: 127.0.0.1
+pg_ipAddress: 10.0.4.1
+pg_portNum: 5432
+pg_userName: postgres
+pg_passwd: postgres
+pg_dbName: hb_vnf
+
+#Periodic polling of CBS config download
+CBS_polling_allowed: True
+CBS_polling_interval: 300
diff --git a/miss_htbt_service/config_notif.py b/miss_htbt_service/config_notif.py
new file mode 100644
index 0000000..242b0e9
--- /dev/null
+++ b/miss_htbt_service/config_notif.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python3
+# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Author Prakash Hosangady (ph553f)
+# Read the hb_common table
+# Update the state to RECONFIGURATION and save the hb_common table
+
+import os
+import sched, datetime, time
+import string
+import sys
+import socket
+import yaml
+import psycopg2
+from pathlib import Path
+import os.path as path
+
+hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml"))
+
+def postgres_db_open(username,password,host,port,database_name):
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ return True
+ connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port)
+ return connection
+
+def db_table_creation_check(connection_db,table_name):
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ return True
+ try:
+ cur = connection_db.cursor()
+ query_db = "select * from information_schema.tables where table_name='%s'" %(table_name)
+ cur.execute(query_db)
+ database_names = cur.fetchone()
+ if(database_names is not None):
+ if(table_name in database_names):
+ print("HB_Notif::Postgres has already has table -", table_name)
+ return True
+ else:
+ print("HB_Notif::Postgres does not have table - ", table_name)
+ return False
+ except (psycopg2.DatabaseError, e):
+ print('COMMON:Error %s' % e)
+ finally:
+ cur.close()
+
+def commit_and_close_db(connection_db):
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ return True
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ connection_db.close()
+ return True
+ except(psycopg2.DatabaseError, e):
+ return False
+
+def read_hb_properties():
+ #Read the hbproperties.yaml for postgress and CBS related data
+ s=open(hb_properties_file, 'r')
+ a=yaml.load(s)
+ if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)):
+ ip_address = a['pg_ipAddress']
+ port_num = a['pg_portNum']
+ user_name = a['pg_userName']
+ password = a['pg_passwd']
+ else:
+ ip_address = os.getenv('pg_ipAddress')
+ port_num = os.getenv('pg_portNum')
+ user_name = os.getenv('pg_userName')
+ password = os.getenv('pg_passwd')
+
+ dbName = a['pg_dbName']
+ db_name = dbName.lower()
+ cbs_polling_required = a['CBS_polling_allowed']
+ cbs_polling_interval = a['CBS_polling_interval']
+ s.close()
+ return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+
+def read_hb_common(user_name,password,ip_address,port_num,db_name):
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ hbc_pid = 10
+ hbc_srcName = "srvc_name"
+ hbc_time = 1541234567
+ hbc_state = "RUNNING"
+ return hbc_pid, hbc_state, hbc_srcName, hbc_time
+ connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ cur = connection_db.cursor()
+ query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;"
+ cur.execute(query_value)
+ rows = cur.fetchall()
+ print("HB_Notif::hb_common contents - ", rows)
+ hbc_pid = rows[0][0]
+ hbc_srcName = rows[0][1]
+ hbc_time = rows[0][2]
+ hbc_state = rows[0][3]
+ commit_and_close_db(connection_db)
+ cur.close()
+ return hbc_pid, hbc_state, hbc_srcName, hbc_time
+
+def update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name):
+ current_time = int(round(time.time()))
+ source_name = socket.gethostname()
+ source_name = source_name + "-" + str(os.getenv('SERVICE_NAME'))
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ return True
+ connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ cur = connection_db.cursor()
+ query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state)
+ cur.execute(query_value)
+ commit_and_close_db(connection_db)
+ cur.close()
+ return True
+
+#if __name__ == "__main__":
+def config_notif_run():
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties()
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ return True
+ connection_db = postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ cur = connection_db.cursor()
+ if(db_table_creation_check(connection_db,"hb_common") == False):
+ print("HB_Notif::ERROR::hb_common table not exists - No config download")
+ connection_db.close()
+ else:
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name)
+ state = "RECONFIGURATION"
+ update_flg = 1
+ ret = update_hb_common(update_flg, hbc_pid, state, user_name,password,ip_address,port_num,db_name)
+ if (ret == True):
+ print("HB_Notif::hb_common table updated with RECONFIGURATION state")
+ commit_and_close_db(connection_db)
+ return True
+ else:
+ print("HB_Notif::Failure updating hb_common table")
+ commit_and_close_db(connection_db)
+ return False
+
+ cur.close()
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
new file mode 100644
index 0000000..6113be2
--- /dev/null
+++ b/miss_htbt_service/db_monitoring.py
@@ -0,0 +1,238 @@
+#!/usr/bin/env python3
+# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Author Prakash Hosangady(ph553f)
+# DB Monitoring
+# Tracks Heartbeat messages on each of the VNFs stored in postgres DB
+# and generates Missing Heartbeat signal for Policy Engine
+
+import requests
+import math
+import sched, datetime, time
+import json
+import string
+import sys
+import os
+import socket
+import requests
+import htbtworker as pm
+import misshtbtd as db
+import logging
+import get_logger
+
+_logger = get_logger.get_logger(__name__)
+
+def db_monitoring(current_pid,json_file,user_name,password,ip_address,port_num,db_name):
+ while(True):
+ time.sleep(20)
+ with open(json_file, 'r') as outfile:
+ cfg = json.load(outfile)
+ pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url'])
+
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
+ source_name = socket.gethostname()
+ source_name = source_name + "-" + str(os.getenv('SERVICE_NAME'))
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ break
+ connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ cur = connection_db.cursor()
+ if(int(current_pid)==int(hbc_pid) and source_name==hbc_srcName and hbc_state == "RUNNING"):
+ _logger.info("DBM: Active DB Monitoring Instance")
+ db_query = "Select event_name from vnf_table_1"
+ cur.execute(db_query)
+ vnf_list = [item[0] for item in cur.fetchall()]
+ for event_name in vnf_list:
+ query_value = "SELECT current_state FROM hb_common;"
+ cur.execute(query_value)
+ rows = cur.fetchall()
+ hbc_state = rows[0][0]
+ if( hbc_state == "RECONFIGURATION"):
+ _logger.info("DBM:Waiting for hb_common state to become RUNNING")
+ break
+
+ db_query = "Select validity_flag,source_name_count,heartbeat_interval,heartbeat_missed_count,closed_control_loop_name,policy_version, policy_name,policy_scope, target_type,target,version from vnf_table_1 where event_name= '%s'" %(event_name)
+ cur.execute(db_query)
+ rows = cur.fetchall()
+ validity_flag = rows[0][0]
+ source_name_count = rows[0][1]
+ heartbeat_interval = rows[0][2]
+ heartbeat_missed_count = rows[0][3]
+ closed_control_loop_name = rows[0][4]
+ policy_version = rows[0][5]
+ policy_name = rows[0][6]
+ policy_scope = rows[0][7]
+ target_type = rows[0][8]
+ target = rows[0][9]
+ version = rows[0][10]
+ comparision_time = (heartbeat_interval*heartbeat_missed_count)*1000
+ if (validity_flag ==1):
+ for source_name_key in range(source_name_count):
+ epoc_time = int(round(time.time()*1000))
+ epoc_query = "Select last_epo_time,source_name,cl_flag from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(event_name,(source_name_key+1))
+ cur.execute(epoc_query)
+ row = cur.fetchall()
+ if (len(row)==0):
+ continue
+ epoc_time_sec = row[0][0]
+ srcName = row[0][1]
+ cl_flag = row[0][2]
+ vnfName = event_name
+ if((epoc_time-epoc_time_sec)>comparision_time and cl_flag ==0):
+ msg="DBM:Time to raise Control Loop Event for target type - ", target_type
+ _logger.info(msg)
+ if(target_type == "VNF"):
+ json_object = json.dumps({
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": { "generic-vnf.vnf-name": srcName} ,
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE"
+ });
+ elif(target_type == "VM"):
+ json_object = json.dumps({
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": { "vserver.vserver-name": srcName} ,
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE"
+ });
+ else:
+ continue
+ payload = json_object
+ msg="DBM: CL Json object is", json_object
+ _logger.info(msg)
+ #psend_url = pol_url+'DefaultGroup/1?timeout=15000'
+ psend_url = pol_url
+ msg="DBM:",psend_url
+ _logger.info(msg)
+ msg="DBM:DB monitoring raising alarm event "+psend_url
+ _logger.info(msg)
+ #Send response for policy on output topic
+ r = requests.post(psend_url, data=payload)
+ msg="DBM:",r.status_code, r.reason
+ _logger.info(msg)
+ ret = r.status_code
+ msg="DBM:Status code after raising the control loop event is",ret
+ _logger.info(msg)
+ cl_flag = 1
+ update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1))
+ cur.execute(update_query)
+ connection_db.commit()
+ elif((epoc_time - epoc_time_sec) < comparision_time and cl_flag ==1):
+ msg="DBM:Time to clear Control Loop Event for target type - ", target_type
+ _logger.info(msg)
+ epoc_time = int(round(time.time()))
+ #last_date_time = datetime.datetime.now()
+ if(target_type == "VNF"):
+ json_object = json.dumps({
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": { "generic-vnf.vnf-name": srcName} ,
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ABATED",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE"
+ });
+ elif(target_type == "VM"):
+ json_object = json.dumps({
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": { "vserver.vserver-name": srcName} ,
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ABATED",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE"
+ });
+ else:
+ continue
+ payload = json_object
+ msg="DBM: CL Json object is", json_object
+ _logger.info(msg)
+ #psend_url = pol_url+'DefaultGroup/1?timeout=15000'
+ psend_url = pol_url
+ msg="DBM:",psend_url
+ _logger.info(msg)
+ msg="DBM:Heartbeat Dead raising alarm event "+psend_url
+ _logger.info(msg)
+ #Send response for policy on output topic
+ r = requests.post(psend_url, data=payload)
+ msg="DBM:",r.status_code, r.reason
+ _logger.info(msg)
+ ret = r.status_code
+ msg="DBM:Status code after raising the control loop event is",ret
+ _logger.info(msg)
+ cl_flag = 0
+ update_query = "UPDATE vnf_table_2 SET CL_FLAG=%d where EVENT_NAME ='%s' and source_name_key=%d" %(cl_flag,event_name,(source_name_key+1))
+ cur.execute(update_query)
+ connection_db.commit()
+
+ else:
+ msg="DBM:DB Monitoring is ignored for %s since validity flag is 0" %(event_name)
+ _logger.info(msg)
+
+ for source_name_key in range(source_name_count):
+ delete_query_table2 = "DELETE FROM vnf_table_2 WHERE EVENT_NAME = '%s' and source_name_key=%d;" %(event_name,source_name_key)
+ cur.execute(delete_query_table2)
+ delete_query = "DELETE FROM vnf_table_1 WHERE EVENT_NAME = '%s';" %(event_name)
+ cur.execute(delete_query)
+ connection_db.commit()
+ """
+ Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2
+ """
+ else:
+ msg="DBM:Inactive instance or hb_common state is not RUNNING"
+ _logger.info(msg)
+ pm.commit_and_close_db(connection_db)
+ cur.close()
+ break;
+
+if __name__ == "__main__":
+ _logger.info("DBM: DBM Process started")
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties()
+ current_pid = sys.argv[1]
+ jsfile = sys.argv[2]
+ msg="DBM:Parent process ID and json file name",current_pid, jsfile
+ _logger.info(msg)
+ while (True):
+ db_monitoring(current_pid,jsfile,user_name,password,ip_address,port_num,db_name)
diff --git a/miss_htbt_service/get_logger.py b/miss_htbt_service/get_logger.py
index a18ed5c..e8d008c 100755..100644
--- a/miss_htbt_service/get_logger.py
+++ b/miss_htbt_service/get_logger.py
@@ -22,7 +22,8 @@ import logging
'''Configures the module root logger'''
root = logging.getLogger()
if root.handlers:
- root.handlers.clear()
+ #root.handlers.clear()
+ del root.handlers[:]
formatter = logging.Formatter('%(asctime)s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(levelname)s | %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index 6123386..5b62943 100755..100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# Copyright 2017 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright 2018 AT&T Intellectual Property, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,217 +13,235 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-# Author Gokul Singaraju gs244f@att.com
+# Author Prakash Hosangady(ph553f@att.com)
# Simple Microservice
# Tracks Heartbeat messages on input topic in DMaaP
-# and generates Missing Heartbeat signal for Policy Engine
+# and poppulate the information in postgres DB
+import psycopg2
import requests
-import math
-import sched, datetime, time
-import json
-import string
-import sys
+import os
+import json,sys,time
+import misshtbtd as db
+import logging
+import get_logger
+import os.path as path
+_logger = get_logger.get_logger(__name__)
-# Initialise tracking hash tables
-intvl = 60
-missing_htbt = 2
-#tracks last epoch time
-hearttrack = {}
-#tracks sequence number
-heartstate = {}
-#tracks sequence number differences
-heartflag = {}
-#saves heartbeat message for policy
-heartmsg = {}
-mr_url = 'http://mrrouter.onap.org:3904'
-pol_url = 'http://mrrouter.onap.org:3904'
-intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT'
-outopic = 'POLICY-HILOTCA-EVENT-OUTPUT'
-nfc = "vVNF"
-cl_loop = 'ControlLoopEvent1'
-periodic_scheduler = None
+def read_json_file(i):
+ if (i==0):
+ with open (path.abspath(path.join(__file__, "../../tests/test1.json")), "r") as outfile:
+ cfg = json.load(outfile)
+ elif (i == 1):
+ with open (path.abspath(path.join(__file__, "../../tests/test2.json")), "r") as outfile:
+ cfg = json.load(outfile)
+ elif (i ==2):
+ with open( path.abspath(path.join(__file__, "../../tests/test3.json")), 'r') as outfile:
+ cfg = json.load(outfile)
+ return cfg
-# Checks for heartbeat event on periodic basis
-class PeriodicScheduler(object):
- def __init__(self):
- self.scheduler = sched.scheduler(time.time, time.sleep)
-
- def setup(self, interval, action, actionargs=()):
- #print("Args are :", locals())
- action(*actionargs)
- self.scheduler.enter(interval, 1, self.setup,(interval, action, actionargs))
- def run(self):
- self.scheduler.run()
+def process_msg(jsfile,user_name, password, ip_address, port_num, db_name):
+ global mr_url
+ i=0
+ sleep_duration = 10
+ while(True):
+ time.sleep(sleep_duration)
+ with open(jsfile, 'r') as outfile:
+ cfg = json.load(outfile)
+ mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url'])
- def stop(self):
- list(map(self.scheduler.cancel, self.scheduler.queue))
+ while(True):
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
+ if(hbc_state == "RECONFIGURATION"):
+ _logger.info("HBT:Waiting for hb_common state to become RUNNING")
+ time.sleep(10)
+ else:
+ break
+
+ if(os.getenv('pytest', "") == 'test'):
+ eventnameList = ["Heartbeat_vDNS","Heartbeat_vFW","Heartbeat_xx"]
+ else:
+ connection_db = postgres_db_open(user_name, password, ip_address, port_num, db_name)
+ cur = connection_db.cursor()
+ db_query = "Select event_name from vnf_table_1"
+ cur.execute(db_query)
+ eventnameList = [item[0] for item in cur.fetchall()]
+ msg="\n\nHBT:eventnameList values ", eventnameList
+ _logger.info(msg)
+ if "groupID" not in os.environ or "consumerID" not in os.environ:
+ get_url = mr_url + 'DefaultGroup/1?timeout=15000'
+ else:
+ get_url = mr_url + os.getenv('groupID') + '/' + os.getenv('consumerID') + '?timeout=15000'
+ msg="HBT:Getting :"+get_url
+ _logger.info(msg)
+ if(os.getenv('pytest', "") == 'test'):
+ jsonobj = read_json_file(i)
+ jobj = []
+ jobj.append(jsonobj)
+ i=i+1
+ msg="HBT:newly received test message", jobj
+ _logger.info(msg)
+ if (i >= 3):
+ i=0
+ break
+ else:
+ res = requests.get(get_url)
+ msg="HBT:",res.text
+ _logger.info(msg)
+ inputString = res.text
+ #If mrstatus in message body indicates some information, not json msg.
+ if ("mrstatus" in inputString):
+ if (sleep_duration < 60):
+ sleep_duration = sleep_duration + 10
+ continue
+ jlist = inputString.split('\n');
+ # Process the DMaaP input message retreived
+ for line in jlist:
+ try:
+ jobj = json.loads(line)
+ except ValueError:
+ msg='HBT:Decoding JSON has failed'
+ _logger.error(msg)
+ continue
+ if len(jobj) == 0:
+ continue
+ for item in jobj:
+ try:
+ if(os.getenv('pytest', "") == 'test'):
+ jitem = jsonobj
+ else:
+ jitem = json.loads(item)
+ srcname = (jitem['event']['commonEventHeader']['sourceName'])
+ lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec'])
+ seqnum = (jitem['event']['commonEventHeader']['sequence'])
+ eventName = (jitem['event']['commonEventHeader']['eventName'])
+ except(Exception) as err:
+ msg = "HBT message process error - ",err
+ _logger.error(msg)
+ continue
+ msg="HBT:Newly received HB event values ::", eventName,lastepo,srcname
+ _logger.info(msg)
+ if(db_table_creation_check(connection_db,"vnf_table_2") ==False):
+ msg="HBT:Creating vnf_table_2"
+ _logger.info(msg)
+ cur.execute("CREATE TABLE vnf_table_2 (EVENT_NAME varchar , SOURCE_NAME_KEY integer , PRIMARY KEY(EVENT_NAME,SOURCE_NAME_KEY),LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer);")
+ else:
+ msg="HBT:vnf_table_2 is already there"
+ _logger.info(msg)
+ if(eventName in eventnameList):
+ db_query = "Select source_name_count from vnf_table_1 where event_name='%s'" %(eventName)
+ msg="HBT:",db_query
+ _logger.info(msg)
+ if(os.getenv('pytest', "") == 'test'):
+ break
+ cur.execute(db_query)
+ row = cur.fetchone()
+ source_name_count = row[0]
+ source_name_key = source_name_count+1
+ cl_flag = 0
+ if(source_name_count==0):
+ msg="HBT: Insert entry in table_2,source_name_count=0 : ",row
+ _logger.info(msg)
+ query_value = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag)
+ cur.execute(query_value)
+ update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName)
+ cur.execute(update_query)
+ else:
+ msg="HBT:event name, source_name & source_name_count are",eventName, srcname, source_name_count
+ _logger.info(msg)
+ for source_name_key in range(source_name_count):
+ epoc_query = "Select source_name from vnf_table_2 where event_name= '%s' and source_name_key=%d" %(eventName,(source_name_key+1))
+ msg="HBT:eppc query is",epoc_query
+ _logger.info(msg)
+ cur.execute(epoc_query)
+ row = cur.fetchall()
+ if (len(row)==0):
+ continue
+ db_srcname = row[0][0]
+ if (db_srcname == srcname):
+ msg="HBT: Update vnf_table_2 : ",source_name_key, row
+ _logger.info(msg)
+ update_query = "UPDATE vnf_table_2 SET LAST_EPO_TIME='%d',SOURCE_NAME='%s' where EVENT_NAME='%s' and SOURCE_NAME_KEY=%d" %(lastepo,srcname,eventName,(source_name_key+1))
+ cur.execute(update_query)
+ source_name_key = source_name_count
+ break
+ else:
+ continue
+ msg="HBT: The source_name_key and source_name_count are ", source_name_key, source_name_count
+ _logger.info(msg)
+ if (source_name_count == (source_name_key+1)):
+ source_name_key = source_name_count+1
+ msg="HBT: Insert entry in table_2 : ",row
+ _logger.info(msg)
+ insert_query = "INSERT INTO vnf_table_2 VALUES('%s',%d,%d,'%s',%d);" %(eventName,source_name_key,lastepo,srcname,cl_flag)
+ cur.execute(insert_query)
+ update_query = "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT='%d' where EVENT_NAME ='%s'" %(source_name_key,eventName)
+ cur.execute(update_query)
+ else:
+ _logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
+ commit_db(connection_db)
+ commit_and_close_db(connection_db)
+ if(os.getenv('pytest', "") != 'test'):
+ cur.close()
+
+def postgres_db_open(username,password,host,port,database_name):
+
+ if(os.getenv('pytest', "") == 'test'):
+ return True
+ connection = psycopg2.connect(database=database_name, user = username, password = password, host = host, port =port)
+ return connection
-# Process the heartbeat event on input topic
-def periodic_event():
- global periodic_scheduler
- global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop
- ret = 0
- #print("Args are :", locals())
- print("{0} Checking...".format(datetime.datetime.now()))
- #Read heartbeat
- #get_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000'
- get_url = mr_url+'/DefaultGroup/1?timeout=15000'
- print("Getting :"+get_url)
- try:
- res = requests.get(get_url)
- #print(res)
- #print(res.headers)
- print(res.text)
- #print(res.json)
- inputString = res.text
- #jlist = json.loads(inputString)
- jlist = inputString.split('\n');
- #print("List:"+jlist[0])
- # Process the DMaaP input message retreived
- for line in jlist:
- print("Line:"+line)
- try:
- jobj = json.loads(line)
- except ValueError:
- print('Decoding JSON has failed')
- continue
- #print(jobj)
- srcid = (jobj['event']['commonEventHeader']['sourceId'])
- lastepo = (jobj['event']['commonEventHeader']['lastEpochMicrosec'])
- seqnum = (jobj['event']['commonEventHeader']['sequence'])
- nfcode = (jobj['event']['commonEventHeader']['nfNamingCode'])
- if( nfcode and nfc != nfcode):
- continue
- if( srcid in hearttrack ):
- tdiff = lastepo - hearttrack[srcid]
- sdiff = seqnum - heartstate[srcid]
- print("Existing source time diff :"+str(tdiff)+" seqdiff :"+str(sdiff))
- # check time difference is within limits and seq num is less than allowed
- if((0 <= tdiff <= 61000000) and sdiff < missing_htbt):
- print("Heartbeat Alive...")
- hearttrack[srcid] = lastepo
- heartstate[srcid] = seqnum;
- heartflag[srcid] = sdiff;
- heartmsg[srcid] = jobj;
- else:
- jobj["internalHeaderFields"] = json.dumps({
- "closedLoopFlag": "True",
- "eventTag": "hp.Heartbeat Service.20171022.8447964515",
- "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT",
- "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000",
- "closedLoopControlName": cl_loop,
- "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000"
- });
- heartmsg[srcid] = jobj;
- payload = heartmsg[srcid]
- print(payload)
- #psend_url = pol_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
- psend_url = pol_url+'/DefaultGroup/1?timeout=15000'
- print(psend_url)
- print("Heartbeat Dead raising alarm event "+psend_url)
- #Send response for policy on output topic
- r = requests.post(psend_url, data=payload)
- print(r.status_code, r.reason)
- ret = r.status_code
- del heartstate[srcid]
- del hearttrack[srcid]
- del heartflag[srcid]
- else:
- print("Adding new source")
- hearttrack[srcid] = lastepo
- heartstate[srcid] = seqnum
- heartflag[srcid] = 1
- heartmsg[srcid] = jobj;
- ret = 1
- chkeys = []
- for key in heartstate.keys():
- print(key,heartstate[key])
- if( heartflag[key] == 0 ):
- print("Heartbeat Dead raise alarm event"+key)
- chkeys.append( key )
- #print payload
- heartmsg[key]["internalHeaderFields"] = json.dumps({
- "closedLoopFlag": "True",
- "eventTag": "hp.Heartbeat Service.20171022.8447964515",
- "collectorTimeStamp": "Sun, 10 22 2017 03:04:27 GMT",
- "lastDatetime": "Sun, 22 Oct 2017 03:06:32 +0000",
- "closedLoopControlName": cl_loop,
- "firstDatetime": "Sun, 22 Oct 2017 03:06:32 +0000"
- })
- payload = heartmsg[key]
- print(payload)
- send_url = pol_url+'/DefaultGroup/1?timeout=15000'
- print(send_url)
- r = requests.post(send_url, data=payload)
- print(r.status_code, r.reason)
- ret = r.status_code
- heartflag[key] = 0
- for chkey in chkeys:
- print(chkey)
- del heartstate[chkey]
- del hearttrack[chkey]
- del heartflag[chkey]
- except requests.exceptions.ConnectionError:
- print("Connection refused ..")
- return ret
+def db_table_creation_check(connection_db,table_name):
+ if(os.getenv('pytest', "") == 'test'):
+ return True
+ try:
+ cur = connection_db.cursor()
+ query_db = "select * from information_schema.tables where table_name='%s'" %(table_name)
+ cur.execute(query_db)
+ database_names = cur.fetchone()
+ if(database_names is not None):
+ if(table_name in database_names):
+ return True
+ else:
+ return False
+
+
+ except (psycopg2.DatabaseError, e):
+ msg = 'COMMON:Error %s' % e
+ _logger.error(msg)
+ finally:
+ cur.close()
-#test setup for coverage
-def test_setup(args):
- global mr_url, pol_url, missing_htbt, intvl, intopic, outopic
- missing_htbt = float(int(args[2]))
- intvl = float(int(args[3]))
- intopic = args[4]
- outopic = args[5]
- mr_url = get_collector_uri()+'/events/'+intopic
- pol_url = get_policy_uri()+'/events/'+outopic
- print ("Message router url %s " % mr_url)
- print ("Policy url %s " % pol_url)
- print ("Interval %s " % intvl)
- print ("Input topic %s " % intopic)
- print ("Output topic %s " % outopic)
- #intvl = 60 # every second
+def commit_db(connection_db):
+ if(os.getenv('pytest', "") == 'test'):
+ return True
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ return True
+ except(psycopg2.DatabaseError, e):
+ msg = 'COMMON:Error %s' % e
+ _logger.error(msg)
+ return False
-#Main invocation
-def main(args):
- global periodic_scheduler
- global mr_url, pol_url, missing_htbt, intvl, intopic, outopic, nfc, cl_loop
- #mr_url = get_collector_uri()
- #pol_url = get_policy_uri()
- mr_url = args[0]
- intopic = args[1]
- pol_url = args[2]
- outopic = args[3]
- nfc = args[4]
- missing_htbt = int(args[5])
- intvl = int(args[6])
- cl_loop = args[7]
- print ("Message router url %s " % mr_url)
- print ("Policy router url %s " % pol_url)
- print ("VNF %s " % nfc)
- print ("Interval %s " % intvl)
- if( cl_loop != "internal_test") :
- #intvl = 60 # every second
- #Start periodic scheduler runs every interval
- periodic_scheduler = PeriodicScheduler()
- periodic_scheduler.setup(intvl, periodic_event,) # it executes the event just once
- periodic_scheduler.run() # it starts the scheduler
+def commit_and_close_db(connection_db):
+ if(os.getenv('pytest', "") == 'test'):
+ return True
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ connection_db.close()
+ return True
+ except(psycopg2.DatabaseError, e):
+ msg = 'COMMON:Error %s' % e
+ _logger.error(msg)
+ return False
-if __name__ == "__main__":
- total = len(sys.argv)
- cmdargs = str(sys.argv)
- print ("The total numbers of args passed to the script: %d " % total)
- print ("Missing Heartbeat Args list: %s " % cmdargs)
- print ("Script name: %s" % str(sys.argv[0]))
- for i in range(total):
- print ("Argument # %d : %s" % (i, str(sys.argv[i])))
- main(sys.argv[1:])
-
-
-#force stop scheduler
-def stop():
- global periodic_scheduler
- if not periodic_scheduler is None:
- periodic_scheduler.stop()
+if __name__ == '__main__':
+ jsfile = sys.argv[1]
+ msg="HBT:HeartBeat thread Created"
+ _logger.info("HBT:HeartBeat thread Created")
+ msg="HBT:The config file name passed is -%s", jsfile
+ _logger.info(msg)
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval= db.read_hb_properties()
+ process_msg(jsfile,user_name, password, ip_address, port_num, db_name)
diff --git a/miss_htbt_service/misshtbt.sh b/miss_htbt_service/misshtbt.sh
index 5b598b1..5b598b1 100755..100644
--- a/miss_htbt_service/misshtbt.sh
+++ b/miss_htbt_service/misshtbt.sh
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py
index 02433e9..2069865 100755..100644
--- a/miss_htbt_service/misshtbtd.py
+++ b/miss_htbt_service/misshtbtd.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
# ============LICENSE_START=======================================================
# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
@@ -17,110 +16,406 @@
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-#
-# Author Gokul Singaraju gs244f@att.com
-#
-
+# This is a main process that does the following
+# - Creates the CBS polling process that indicates the periodic download of
+# configuration file from CBS
+# - Creates heartbeat worker process that receives the Heartbeat messages from VNF
+# - Creates DB Monitoring process that generates Control loop event
+# - Download the CBS configuration and populate the DB
+#
+# Author Prakash Hosangady(ph553f@att.com)
+import traceback
import os
import sys
import json
+import datetime
+import time
+import math
import multiprocessing
import logging
import subprocess
+import yaml
+import socket
import get_logger
from pathlib import Path
-
import mod.trapd_settings as tds
+import htbtworker as heartbeat
+import os.path as path
+
+hb_properties_file = path.abspath(path.join(__file__, "../config/hbproperties.yaml"))
from mod.trapd_runtime_pid import save_pid, rm_pid
from mod.trapd_get_cbs_config import get_cbs_config
-#from mod.trapd_exit import cleanup_and_exit
+from mod.trapd_exit import cleanup_and_exit
from mod.trapd_http_session import init_session_obj
+ip_address = "localhost"
+port_num = 5432
+user_name = "postgres"
+password = "postgres"
+db_name = "hb_vnf"
+cbs_polling_required = "true"
+cbs_polling_interval = 300
+mr_url = None
+pol_url = None
+update_db = 0
+jsfile='empty'
+import sys
+ABSOLUTE_PATH1 = path.abspath(path.join(__file__, "../htbtworker.py"))
+ABSOLUTE_PATH2 = path.abspath(path.join(__file__, "../db_monitoring.py"))
+ABSOLUTE_PATH3 = path.abspath(path.join(__file__, "../check_health.py"))
+ABSOLUTE_PATH4 = path.abspath(path.join(__file__, "../cbs_polling.py"))
+def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
+ from psycopg2 import connect
+ import sys
+ try:
+ con = connect(user=user_name, host = ip_address, password = password)
+ database_name = db_name
+ con.autocommit = True
+ cur = con.cursor()
+ query_value = "SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '%s'" %(database_name)
+ cur.execute(query_value)
+ not_exists_row = cur.fetchone()
+ msg = "MSHBT:Create_database:DB not exists? ", not_exists_row
+ _logger.info(msg)
+ not_exists = not_exists_row[0]
+ if not_exists is True:
+ _logger.info("MSHBT:Creating database ...")
+ query_value = "CREATE DATABASE %s" %(database_name)
+ cur.execute(query_value)
+ else:
+ _logger.info("MSHBD:Database already exists")
+ '''
+ con = None
+ con = connect(user=user_name, host = ip_address, password=password)
+ database_name = db_name
+ con.autocommit = True
+ cur = con.cursor()
+ cur.execute('CREATE DATABASE %s IF NOT EXISTS %s' %(database_name,database_name))
+ '''
+ cur.close()
+ con.close()
+ except(Exception) as err:
+ msg = "MSHBD:DB Creation -",err
+ _logger.error(msg)
-mr_url = 'http://mrrouter.onap.org:3904'
-pol_url = 'http://mrrouter.onap.org:3904'
-intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT'
-outopic = 'POLICY-HILOTCA-EVENT-OUTPUT'
+#def get_pol_and_mr_urls(jsfile, pol_url, mr_url):
+# with open(jsfile, 'r') as outfile:
+# cfg = json.load(outfile)
+# mr_url = str(cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url'])
+# pol_url = str(cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url'])
-#Checks heartbeat by calling worker thread
-def checkhtbt(mr_url, intopic, pol_url, outopic, nfc, misshtbt,intvl, cl_loop):
- print('Doing some work',mr_url, misshtbt,intvl,intopic,outopic)
- my_file = Path("./miss_htbt_service/htbtworker.py")
- if my_file.is_file():
- subprocess.call(["python","./miss_htbt_service/htbtworker.py" , mr_url , intopic, pol_url, outopic, nfc, str(misshtbt) , str(intvl), cl_loop ])
+def read_hb_common(user_name,password,ip_address,port_num,db_name):
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ hbc_pid = 10
+ hbc_srcName = "srvc_name"
+ hbc_time = 1584595881
+ hbc_state = "RUNNING"
+ return hbc_pid, hbc_state, hbc_srcName, hbc_time
+ connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ cur = connection_db.cursor()
+ query_value = "SELECT process_id,source_name,last_accessed_time,current_state FROM hb_common;"
+ cur.execute(query_value)
+ rows = cur.fetchall()
+ hbc_pid = rows[0][0]
+ hbc_srcName = rows[0][1]
+ hbc_time = rows[0][2]
+ hbc_state = rows[0][3]
+ heartbeat.commit_and_close_db(connection_db)
+ cur.close()
+ return hbc_pid, hbc_state, hbc_srcName, hbc_time
+
+def create_update_hb_common(update_flg, process_id, state, user_name,password,ip_address,port_num,db_name):
+ current_time = int(round(time.time()))
+ source_name = socket.gethostname()
+ source_name = source_name + "-" + os.getenv('SERVICE_NAME')
+ envPytest = os.getenv('pytest', "")
+ if (envPytest != 'test'):
+ connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ cur = connection_db.cursor()
+ if(heartbeat.db_table_creation_check(connection_db,"hb_common") ==False):
+ cur.execute("CREATE TABLE hb_common (PROCESS_ID integer primary key,SOURCE_NAME varchar,LAST_ACCESSED_TIME integer,CURRENT_STATE varchar);")
+ query_value = "INSERT INTO hb_common VALUES(%d,'%s',%d,'%s');" %(process_id, source_name, current_time, state)
+ _logger.info("MSHBT:Created hb_common DB and updated new values")
+ cur.execute(query_value)
+ if(update_flg == 1):
+ query_value = "UPDATE hb_common SET PROCESS_ID='%d',SOURCE_NAME='%s', LAST_ACCESSED_TIME='%d',CURRENT_STATE='%s'" %(process_id, source_name, current_time, state)
+ _logger.info("MSHBT:Updated hb_common DB with new values")
+ cur.execute(query_value)
+ heartbeat.commit_and_close_db(connection_db)
+ cur.close()
+
+def create_update_vnf_table_1(jsfile,update_db,connection_db):
+ with open(jsfile, 'r') as outfile:
+ cfg = json.load(outfile)
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
+ else:
+ cur = connection_db.cursor()
+ if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False):
+ cur.execute("CREATE TABLE vnf_table_1 (EVENT_NAME varchar primary key,HEARTBEAT_MISSED_COUNT integer,HEARTBEAT_INTERVAL integer,CLOSED_CONTROL_LOOP_NAME varchar,POLICY_VERSION varchar,POLICY_NAME varchar,POLICY_SCOPE varchar,TARGET_TYPE varchar,TARGET varchar, VERSION varchar,SOURCE_NAME_COUNT integer,VALIDITY_FLAG integer);")
+ _logger.info("MSHBT:Created vnf_table_1 table")
+ if(update_db == 1):
+ query_value = "UPDATE vnf_table_1 SET VALIDITY_FLAG=0 where VALIDITY_FLAG=1;"
+ cur.execute(query_value)
+ _logger.info("MSHBT:Set Validity flag to zero in vnf_table_1 table")
+ # Put some initial values into the queue
+ db_query = "Select event_name from vnf_table_1"
+ cur.execute(db_query)
+ vnf_list = [item[0] for item in cur.fetchall()]
+ for vnf in (cfg['heartbeat_config']['vnfs']):
+ nfc = vnf['eventName']
+ #_logger.error("MSHBT:",nfc)
+ validity_flag = 1
+ source_name_count = 0
+ missed = vnf['heartbeatcountmissed']
+ intvl = vnf['heartbeatinterval']
+ clloop = vnf['closedLoopControlName']
+ policyVersion = vnf['policyVersion']
+ policyName = vnf['policyName']
+ policyScope = vnf['policyScope']
+ target_type = vnf['target_type']
+ target = vnf['target']
+ version = vnf['version']
+
+ if(nfc not in vnf_list):
+ query_value = "INSERT INTO vnf_table_1 VALUES('%s',%d,%d,'%s','%s','%s','%s','%s','%s','%s',%d,%d);" %(nfc,missed,intvl,clloop,policyVersion,policyName,policyScope,target_type, target,version,source_name_count,validity_flag)
else:
- subprocess.call(["python","/opt/app/misshtbt/bin/htbtworker.py" , mr_url , intopic, pol_url, outopic, nfc, str(misshtbt) , str(intvl), cl_loop ])
+ query_value = "UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT='%d',HEARTBEAT_INTERVAL='%d', CLOSED_CONTROL_LOOP_NAME='%s',POLICY_VERSION='%s',POLICY_NAME='%s', POLICY_SCOPE='%s',TARGET_TYPE='%s', TARGET='%s',VERSION='%s',VALIDITY_FLAG='%d' where EVENT_NAME='%s'" %(missed,intvl,clloop,policyVersion,policyName,policyScope,target_type,target,version,validity_flag,nfc)
+ if (envPytest != 'test'):
+ cur.execute(query_value)
+ #heartbeat.commit_and_close_db(connection_db)
+ if (envPytest != 'test'):
+ cur.close()
+ _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file")
+
+def hb_cbs_polling_process(pid_current):
+ my_file = Path("./miss_htbt_service/cbs_polling.py")
+# if my_file.is_file():
+ subprocess.call(["python3.6",ABSOLUTE_PATH4 , str(pid_current) ])
+# else:
+# subprocess.call(["python3.6",ABSOLUTE_PATH4 , str(pid_current) ])
sys.stdout.flush()
+ _logger.info("MSHBT:Creaated CBS polling process")
+ return
+def hb_worker_process(config_file_path):
+ my_file = Path("./miss_htbt_service/htbtworker.py")
+# if my_file.is_file():
+ subprocess.call(["python3.6",ABSOLUTE_PATH1 , config_file_path ])
+# else:
+# subprocess.call(["python3.6",ABSOLUTE_PATH1 , config_file_path ])
+ sys.stdout.flush()
+ _logger.info("MSHBT:Creaated Heartbeat worker process")
return
-_logger = get_logger.get_logger(__name__)
+def db_monitoring_process(current_pid,jsfile):
+ my_file = Path("./miss_htbt_service/db_monitoring.py")
+# if my_file.is_file():
+ subprocess.call(["python3.6",ABSOLUTE_PATH2 , str(current_pid),jsfile])
+# else:
+# subprocess.call(["python3.6",ABSOLUTE_PATH2 , str(current_pid),jsfile])
+ sys.stdout.flush()
+ _logger.info("MSHBT:Creaated DB Monitoring process")
+ return
-#main functon which reads yaml config and invokes heartbeat
-#monitoring
-if __name__ == '__main__':
- try:
- print("Heartbeat Microservice ...")
- if "INURL" in os.environ.keys():
- mr_url = os.environ['INURL']
- if "INTOPIC" in os.environ.keys():
- intopic = os.environ['INTOPIC']
- if "OUTURL" in os.environ.keys():
- pol_url = os.environ['OUTURL']
- if "OUTOPIC" in os.environ.keys():
- outopic = os.environ['OUTOPIC']
- print(outopic)
- multiprocessing.log_to_stderr()
- logger = multiprocessing.get_logger()
- logger.setLevel(logging.INFO)
- my_env = os.environ.copy()
- my_env["PYTHONPATH"] = my_env["PYTHONPATH"]+":/usr/local/lib/python3.6"+":./miss_htbt_service/"
- my_env["PATH"] = my_env["PATH"]+":./bin/:./miss_htbt_service/"
- p = subprocess.Popen(['check_health.py'],stdout=subprocess.PIPE,stderr=subprocess.STDOUT,env=my_env)
- #print(p.communicate())
- jsfile='empty'
-
- # re-request config from config binding service
- # (either broker, or json file override)
- if get_cbs_config():
- current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json"
- msg = "current config logged to : %s" % current_runtime_config_file_name
- logger.error(msg)
- print(msg)
+def read_hb_properties():
+ #Read the hbproperties.yaml for postgress and CBS related data
+ s=open(hb_properties_file, 'r')
+ a=yaml.load(s)
+
+ if((os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None)):
+ ip_address = a['pg_ipAddress']
+ port_num = a['pg_portNum']
+ user_name = a['pg_userName']
+ password = a['pg_passwd']
+ else:
+ ip_address = os.getenv('pg_ipAddress')
+ port_num = os.getenv('pg_portNum')
+ user_name = os.getenv('pg_userName')
+ password = os.getenv('pg_passwd')
+
+ dbName = a['pg_dbName']
+ db_name = dbName.lower()
+ cbs_polling_required = a['CBS_polling_allowed']
+ cbs_polling_interval = a['CBS_polling_interval']
+ s.close()
+ return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+
+def fetch_json_file():
+ if get_cbs_config():
+ #current_runtime_config_file_name = tds.c_config['files.runtime_base_dir'] + "../etc/download.json"
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ current_runtime_config_file_name = "/tmp/opt/app/miss_htbt_service/etc/config.json"
+ else:
+ current_runtime_config_file_name = "../etc/download.json"
+ msg = "MSHBD:current config logged to : %s" % current_runtime_config_file_name
+ _logger.info(msg)
with open(current_runtime_config_file_name, 'w') as outfile:
json.dump(tds.c_config, outfile)
- jsfile = current_runtime_config_file_name
- else:
- msg = "CBS Config not available using local config"
- logger.error(msg)
- print(msg)
+ if os.getenv('pytest', "") == 'test':
+ jsfile = current_runtime_config_file_name
+ else:
+ jsfile = "../etc/config.json"
+ os.system('cp ../etc/download.json ../etc/config.json')
+ os.remove("../etc/download.json")
+ else:
+ msg = "MSHBD:CBS Config not available, using local config"
+ _logger.warning(msg)
my_file = Path("./etc/config.json")
if my_file.is_file():
- jsfile = "./etc/config.json"
+ jsfile = "./etc/config.json"
else:
- jsfile = "../etc/config.json"
-
- print("opening %s " % jsfile)
- with open(jsfile, 'r') as outfile:
- cfg = json.load(outfile)
- # Put some initial values into the queue
- mr_url = cfg['streams_subscribes']['ves_heartbeat']['dmaap_info']['topic_url']
- pol_url = cfg['streams_publishes']['ves_heartbeat']['dmaap_info']['topic_url']
- jobs = []
- print(cfg['heartbeat_config'])
- for vnf in (cfg['heartbeat_config']['vnfs']):
- print(vnf)
- nfc = vnf['nfNamingCode']
- missed = vnf['heartbeatcountmissed']
- intvl = vnf['heartbeatinterval']
- clloop = vnf['closedLoopControlName']
- print('{0} {1} {2} {3}'.format(nfc,missed,intvl,clloop))
- #Start Heartbeat monitoring process worker thread on VNFs configured
- logger.info("Starting threads...")
- p = multiprocessing.Process(target=checkhtbt, args=( mr_url, intopic, pol_url, outopic, nfc, missed, intvl, clloop))
- jobs.append(p)
- p.start()
- for j in jobs:
- j.join()
- print('%s.exitcode = %s' % (j.name, j.exitcode))
- except Exception as e:
- _logger.error("Fatal error. Could not start missing heartbeat service due to: {0}".format(e))
+ jsfile = "../etc/config.json"
+ msg = "MSHBT: The json file is - ", jsfile
+ _logger.info(msg)
+ return jsfile
+
+def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
+ envPytest = os.getenv('pytest', "")
+ if (envPytest != 'test'):
+ if(update_db == 0):
+ create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
+ msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name
+ _logger.info(msg)
+ connection_db = heartbeat.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ cur = connection_db.cursor()
+ if(update_db == 0):
+ if(heartbeat.db_table_creation_check(connection_db,"vnf_table_1") ==False):
+ create_update_vnf_table_1(jsfile,update_db,connection_db)
+ else:
+ create_update_vnf_table_1(jsfile,update_db,connection_db)
+ heartbeat.commit_and_close_db(connection_db)
+ cur.close()
+
+def create_process(job_list, jsfile, pid_current):
+ if(len(job_list) == 0):
+ p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,))
+ p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current,jsfile,))
+ p1.start()
+ p2.start()
+ job_list.append(p1)
+ job_list.append(p2)
+ msg = "MSHBD:jobs list is",job_list
+ _logger.info(msg)
+ return job_list
+
+_logger = get_logger.get_logger(__name__)
+
+def main():
+ try:
+ p = subprocess.Popen(['python3.6',ABSOLUTE_PATH3],stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
+ _logger.info("MSHBD:Execution Started")
+ job_list = []
+ pid_current = os.getpid()
+ ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties()
+ msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+ _logger.info(msg)
+ jsfile = fetch_json_file()
+ if(cbs_polling_required == True):
+ p3 = multiprocessing.Process(target=hb_cbs_polling_process, args=(pid_current,))
+ p3.start()
+ update_db = 0
+ create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
+ state = "RECONFIGURATION"
+ update_flg = 0
+ create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name)
+ msg = "MSHBD:Current process id is",pid_current
+ _logger.info(msg)
+ _logger.info("MSHBD:Now be in a continuous loop")
+ i=0
+ while(True):
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name)
+ msg = "MSHBT: hb_common values ",hbc_pid, hbc_state, hbc_srcName, hbc_time
+ _logger.info(msg)
+ current_time = int(round(time.time()))
+ time_difference = current_time - hbc_time
+ msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is",hbc_pid,hbc_srcName,hbc_state,hbc_time,current_time,time_difference
+ _logger.info(msg)
+ source_name = socket.gethostname()
+ source_name = source_name + "-" + str(os.getenv('SERVICE_NAME'))
+ envPytest = os.getenv('pytest', "")
+ if (envPytest == 'test'):
+ if i == 2:
+ hbc_pid = pid_current
+ source_name = hbc_srcName
+ hbc_state = "RECONFIGURATION"
+ elif (i>3):
+ hbc_pid = pid_current
+ source_name = hbc_srcName
+ hbc_state = "RUNNING"
+ if (time_difference <60):
+ if((int(hbc_pid)==int(pid_current)) and (source_name==hbc_srcName)):
+ msg = "MSHBD:config status is",hbc_state
+ _logger.info(msg)
+ if(hbc_state=="RUNNING"):
+ state = "RUNNING"
+ update_flg = 1
+ create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name)
+ elif(hbc_state=="RECONFIGURATION"):
+ _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes")
+ jsfile = fetch_json_file()
+ update_db = 1
+ create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
+ msg = "MSHBD: parameters passed to DBM and HB are %d and %s",pid_current
+ _logger.info(msg)
+ job_list = create_process(job_list, jsfile, pid_current)
+ state = "RUNNING"
+ update_flg = 1
+ create_update_hb_common(update_flg, pid_current, state, user_name,password,ip_address,port_num,db_name)
+
+ else:
+ _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping")
+ if(len(job_list)>=2):
+ _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE")
+ else:
+ jsfile = fetch_json_file()
+ msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",jsfile,pid_current
+ _logger.info(msg)
+ job_list = create_process(job_list, jsfile, pid_current)
+ else:
+ _logger.info("MSHBD:Active instance is inactive for long time: Time to switchover")
+ if((int(hbc_pid)!=int(pid_current))or (source_name!=hbc_srcName)):
+ _logger.info("MSHBD:Initiating to become Active Instance")
+ if(len(job_list)>=2):
+ _logger.info("MSHBD:HB and DBM thread are waiting to become ACTIVE")
+ else:
+ jsfile = fetch_json_file()
+ msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s",jsfile,pid_current
+ _logger.info(msg)
+ job_list = create_process(job_list, jsfile, pid_current)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name,password,ip_address,port_num,db_name)
+ update_flg = 1
+ create_update_hb_common(update_flg, pid_current, hbc_state, user_name,password,ip_address,port_num,db_name)
+ else:
+ _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR")
+ time.sleep(25)
+ if os.getenv('pytest', "") == 'test':
+ i = i + 1
+ if(i > 5):
+ _logger.info("Terminating main process for pytest")
+ p3.terminate()
+ time.sleep(1)
+ p3.join()
+ if(len(job_list)>0):
+ job_list[0].terminate()
+ time.sleep(1)
+ job_list[0].join()
+ job_list.remove(job_list[0])
+ if(len(job_list)>0):
+ job_list[0].terminate()
+ time.sleep(1)
+ job_list[0].join()
+ job_list.remove(job_list[0])
+ break
+
+ except (Exception) as e:
+ msg = "MSHBD:Exception as %s" %(str(traceback.format_exc()))
+ _logger.error(msg)
+
+ msg = "Fatal error. Could not start missing heartbeat service due to: {0}".format(e)
+ _logger.error(msg)
+
+if __name__ == '__main__':
+ main()
diff --git a/miss_htbt_service/mod/__init__.py b/miss_htbt_service/mod/__init__.py
index 1875bf6..1875bf6 100755..100644
--- a/miss_htbt_service/mod/__init__.py
+++ b/miss_htbt_service/mod/__init__.py
diff --git a/miss_htbt_service/mod/trapd_exit.py b/miss_htbt_service/mod/trapd_exit.py
index 6247f4b..6247f4b 100755..100644
--- a/miss_htbt_service/mod/trapd_exit.py
+++ b/miss_htbt_service/mod/trapd_exit.py
diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/trapd_get_cbs_config.py
index c108107..d2b615f 100755..100644
--- a/miss_htbt_service/mod/trapd_get_cbs_config.py
+++ b/miss_htbt_service/mod/trapd_get_cbs_config.py
@@ -33,7 +33,6 @@ import string
import time
import traceback
import collections
-
import mod.trapd_settings as tds
from onap_dcae_cbs_docker_client.client import get_config
from mod.trapd_exit import cleanup,cleanup_and_exit
@@ -92,7 +91,7 @@ def get_cbs_config():
msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + \
" (invalid json?) - FATAL ERROR, exiting"
stdout_logger(msg)
- cleanup_and_exit(1,None)
+ cleanup_and_exit(0,None)
# recalc timeout, set default if not present
try:
diff --git a/miss_htbt_service/mod/trapd_http_session.py b/miss_htbt_service/mod/trapd_http_session.py
index b34c19d..b34c19d 100755..100644
--- a/miss_htbt_service/mod/trapd_http_session.py
+++ b/miss_htbt_service/mod/trapd_http_session.py
diff --git a/miss_htbt_service/mod/trapd_io.py b/miss_htbt_service/mod/trapd_io.py
index c89eaa3..1c40346 100755..100644
--- a/miss_htbt_service/mod/trapd_io.py
+++ b/miss_htbt_service/mod/trapd_io.py
@@ -36,7 +36,6 @@ import string
import time
import traceback
import unicodedata
-
# dcae_snmptrap
import mod.trapd_settings as tds
from mod.trapd_exit import cleanup_and_exit
@@ -49,327 +48,328 @@ prog_name = os.path.basename(__file__)
# # # # # # # # # # ## # # # # # # #
-def roll_all_logs():
- """
- roll all active logs to timestamped version, open new one
- based on frequency defined in files.roll_frequency
- """
-
- # first roll all the eelf files
- # NOTE: this will go away when onap logging is standardized/available
- try:
- # open various ecomp logs - if any fails, exit
- for fd in [tds.eelf_error_fd, tds.eelf_debug_fd, tds.eelf_audit_fd,
- tds.eelf_metrics_fd, tds.arriving_traps_fd, tds.json_traps_fd]:
- fd.close()
-
- roll_file(tds.eelf_error_file_name)
- roll_file(tds.eelf_debug_file_name)
- roll_file(tds.eelf_audit_file_name)
- roll_file(tds.eelf_metrics_file_name)
-
- except Exception as e:
- msg = "Error closing logs: " + str(e)
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
- reopened_successfully = open_eelf_logs()
- if not reopened_successfully:
- msg = "Error re-opening EELF logs during roll-over to timestamped versions - EXITING"
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
- # json log
- roll_file(tds.json_traps_filename)
-
- try:
- tds.json_traps_fd = open_file(tds.json_traps_filename)
- except Exception as e:
- msg = ("Error opening json_log %s : %s" %
- (json_traps_filename, str(e)))
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
- # arriving trap log
- roll_file(tds.arriving_traps_filename)
-
- try:
- tds.arriving_traps_fd = open_file(tds.arriving_traps_filename)
- except Exception as e:
- msg = ("Error opening arriving traps %s : %s" %
- (arriving_traps_filename, str(e)))
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
-
+#def roll_all_logs():
+# """
+# roll all active logs to timestamped version, open new one
+# based on frequency defined in files.roll_frequency
+# """
+#
+# # first roll all the eelf files
+# # NOTE: this will go away when onap logging is standardized/available
+# try:
+# # open various ecomp logs - if any fails, exit
+# for fd in [tds.eelf_error_fd, tds.eelf_debug_fd, tds.eelf_audit_fd,
+# tds.eelf_metrics_fd, tds.arriving_traps_fd, tds.json_traps_fd]:
+# fd.close()
+#
+# roll_file(tds.eelf_error_file_name)
+# roll_file(tds.eelf_debug_file_name)
+# roll_file(tds.eelf_audit_file_name)
+# roll_file(tds.eelf_metrics_file_name)
+#
+# except Exception as e:
+# msg = "Error closing logs: " + str(e)
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+# reopened_successfully = open_eelf_logs()
+# if not reopened_successfully:
+# msg = "Error re-opening EELF logs during roll-over to timestamped versions - EXITING"
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+# # json log
+# roll_file(tds.json_traps_filename)
+
+## try:
+# tds.json_traps_fd = open_file(tds.json_traps_filename)
+# except Exception as e:
+# msg = ("Error opening json_log %s : %s" %
+# (json_traps_filename, str(e)))
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+# # arriving trap log
+# roll_file(tds.arriving_traps_filename)
+#
+# try:
+# tds.arriving_traps_fd = open_file(tds.arriving_traps_filename)
+# except Exception as e:
+# msg = ("Error opening arriving traps %s : %s" %
+# (arriving_traps_filename, str(e)))
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+#
# # # # # # # # # # # # # # # # # # #
# fx: setup_ecomp_logs -> log in eelf format until standard
# is released for python via LOG-161
# # # # # # # # # # ## # # # # # # #
-def open_eelf_logs():
- """
- open various (multiple ???) logs
- """
-
- try:
- # open various ecomp logs - if any fails, exit
-
- tds.eelf_error_file_name = (
- tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_error'])
- tds.eelf_error_fd = open_file(tds.eelf_error_file_name)
-
- except Exception as e:
- msg = "Error opening eelf error log : " + str(e)
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
- try:
- tds.eelf_debug_file_name = (
- tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_debug'])
- tds.eelf_debug_fd = open_file(tds.eelf_debug_file_name)
-
- except Exception as e:
- msg = "Error opening eelf debug log : " + str(e)
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
- try:
- tds.eelf_audit_file_name = (
- tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_audit'])
- tds.eelf_audit_fd = open_file(tds.eelf_audit_file_name)
- except Exception as e:
- msg = "Error opening eelf audit log : " + str(e)
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
- try:
- tds.eelf_metrics_file_name = (
- tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_metrics'])
- tds.eelf_metrics_fd = open_file(tds.eelf_metrics_file_name)
- except Exception as e:
- msg = "Error opening eelf metric log : " + str(e)
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
- return True
-
-# # # # # # # # # # # # # # # # # # #
+#def open_eelf_logs():
+# """
+# open various (multiple ???) logs
+# """
+#
+# try:
+# # open various ecomp logs - if any fails, exit
+#
+# tds.eelf_error_file_name = (
+# tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_error'])
+# tds.eelf_error_fd = open_file(tds.eelf_error_file_name)
+#
+# except Exception as e:
+# msg = "Error opening eelf error log : " + str(e)
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+# try:
+# tds.eelf_debug_file_name = (
+# tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_debug'])
+# tds.eelf_debug_fd = open_file(tds.eelf_debug_file_name)
+#
+# except Exception as e:
+# msg = "Error opening eelf debug log : " + str(e)
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+# try:
+# tds.eelf_audit_file_name = (
+# tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_audit'])
+# tds.eelf_audit_fd = open_file(tds.eelf_audit_file_name)
+# except Exception as e:
+# msg = "Error opening eelf audit log : " + str(e)
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+# try:
+# tds.eelf_metrics_file_name = (
+# tds.c_config['files.eelf_base_dir'] + "/" + tds.c_config['files.eelf_metrics'])
+# tds.eelf_metrics_fd = open_file(tds.eelf_metrics_file_name)
+# except Exception as e:
+# msg = "Error opening eelf metric log : " + str(e)
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+# return True
+#
+## # # # # # # # # # # # # # # # # # #
# fx: roll_log_file -> move provided filename to timestamped version
# # # # # # # # # # ## # # # # # # #
-def roll_file(_loc_file_name):
- """
- move active file to timestamped archive
- """
-
- _file_name_suffix = "%s" % (datetime.datetime.fromtimestamp(time.time()).
- fromtimestamp(time.time()).
- strftime('%Y-%m-%dT%H:%M:%S'))
-
- _loc_file_name_bak = _loc_file_name + '.' + _file_name_suffix
-
- # roll existing file if present
- if os.path.isfile(_loc_file_name):
- try:
- os.rename(_loc_file_name, _loc_file_name_bak)
- return True
- except Exception as e:
- _msg = ("ERROR: Unable to rename %s to %s"
- % (_loc_file_name,
- _loc_file_name_bak))
- ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_CRIT,
- tds.CODE_GENERAL, _msg)
- return False
-
- return False
-
-# # # # # # # # # # # # #
-# fx: open_log_file
-# # # # # # # # # # # # #
-
-
-def open_file(_loc_file_name):
- """
- open _loc_file_name, return file handle
- """
-
- try:
- # open append mode just in case so nothing is lost, but should be
- # non-existent file
- _loc_fd = open(_loc_file_name, 'a')
- return _loc_fd
- except Exception as e:
- msg = "Error opening " + _loc_file_name + " append mode - " + str(e)
- stdout_logger(msg)
- cleanup_and_exit(1, tds.pid_file_name)
-
-
-# # # # # # # # # # # # #
-# fx: close_file
-# # # # # # # # # # # # #
- """
- close _loc_file_name, return True with success, False otherwise
- """
-
-
-def close_file(_loc_fd, _loc_filename):
-
- try:
- _loc_fd.close()
- return True
- except Exception as e:
- msg = "Error closing %s : %s - results indeterminate" % (
- _loc_filename, str(e))
- ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg)
- return False
-
-# # # # # # # # # # # # # # # # # # #
-# fx: ecomp_logger -> log in eelf format until standard
-# is released for python via LOG-161
-# # # # # # # # # # ## # # # # # # #
-
-def ecomp_logger(_log_type, _sev, _error_code, _msg):
- """
- Log to ecomp-style logfiles. Logs include:
-
- Note: this will be updated when https://jira.onap.org/browse/LOG-161
- is closed/available; until then, we resort to a generic format with
- valuable info in "extra=" field (?)
-
- :Parameters:
- _msg -
- :Exceptions:
- none
- :Keywords:
- eelf logging
- :Log Styles:
-
- :error.log:
-
- if CommonLogger.verbose: print("using CommonLogger.ErrorFile")
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- % (requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName,
- errorCategory, errorCode, errorDescription, detailMessage))
-
- error.log example:
-
- 2018-02-20T07:21:34,007+00:00||MainThread|snmp_log_monitor||||FATAL|900||Tue Feb 20 07:21:11 UTC 2018 CRITICAL: [a0cae74e-160e-11e8-8f9f-0242ac110002] ALL publish attempts failed to DMAPP server: dcae-mrtr-zltcrdm5bdce1.1dff83.rdm5b.tci.att.com, topic: DCAE-COLLECTOR-UCSNMP, 339 trap(s) not published in epoch_serno range: 15191112530000 - 15191112620010
-
- :debug.log:
-
- if CommonLogger.verbose: print("using CommonLogger.DebugFile")
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- % (requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel,
- severity, serverIPAddress, server, IPAddress, className, timer, detailMessage))
-
- debug.log example:
-
- none available
-
- :audit.log:
-
- if CommonLogger.verbose: print("using CommonLogger.AuditFile")
- endAuditTime, endAuditMsec = self._getTime()
- if self._begTime is not None:
- d = {'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime,
- 'endmsecs': endAuditMsec}
- else:
- d = {'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime,
- 'endmsecs': endAuditMsec}
-
- self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- % (requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
- statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel,
- severity, serverIPAddress, timer, server, IPAddress, className, unused,
- processKey, customField1, customField2, customField3, customField4,
- detailMessage), extra=d)
-
-
- :metrics.log:
-
- self._logger.log(50,'%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
- % (requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
- targetEntity, targetServiceName, statusCode, responseCode, responseDescription,
- instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server,
- IPAddress,
- className, unused, processKey, targetVirtualEntity, customField1, customField2,
- customField3, customField4, detailMessage), extra=d)
-
- metrics.log example:
-
- none available
-
-
- """
-
- unused = ""
-
- # above were various attempts at setting time string found in other
- # libs; instead, let's keep it real:
- t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3]
- calling_fx = inspect.stack()[1][3]
-
- # DLFM: this entire module is a hack to override concept of prog logging
- # written across multiple files (???), making diagnostics IMPOSSIBLE!
- # Hoping to leverage ONAP logging libraries & standards when available
-
- # catch invalid log type
- if _log_type < 1 or _log_type > 5:
- msg = ("INVALID log type: %s " % _log_type)
- _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s"
- % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, (msg + _msg)))
- try:
- tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
- except Exception as e:
- stdout_logger(str(_out_rec))
-
- return False
-
- if _sev >= tds.minimum_severity_to_log:
- # log to appropriate eelf log (different files ??)
- if _log_type == tds.LOG_TYPE_ERROR:
- _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s'
- % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg))
- try:
- tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
- except Exception as e:
- stdout_logger(str(_out_rec))
- elif _log_type == tds.LOG_TYPE_AUDIT:
- # log message in AUDIT format
- _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s'
- % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg))
- try:
- tds.eelf_audit_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
- except Exception as e:
- stdout_logger(str(_out_rec))
- elif _log_type == tds.LOG_TYPE_METRICS:
- # log message in METRICS format
- _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s'
- % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg))
- try:
- tds.eelf_metrics_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
- except Exception as e:
- stdout_logger(str(_out_rec))
-
- # DEBUG *AND* others - there *MUST BE* a single time-sequenced log for diagnostics!
- # DLFM: too much I/O !!!
- # always write to debug; we need ONE logfile that has time-sequence full view !!!
- # log message in DEBUG format
- _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s"
- % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg))
- try:
- tds.eelf_debug_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
- except Exception as e:
- stdout_logger(str(_out_rec))
-
- return True
-
-# # # # # # # # # # # # #
-# fx: stdout_logger
-# # # # # # # # # # # # #
+#def roll_file(_loc_file_name):
+# """
+# move active file to timestamped archive
+# """
+#
+# _file_name_suffix = "%s" % (datetime.datetime.fromtimestamp(time.time()).
+# fromtimestamp(time.time()).
+# strftime('%Y-%m-%dT%H:%M:%S'))
+#
+# _loc_file_name_bak = _loc_file_name + '.' + _file_name_suffix
+#
+# # roll existing file if present
+# if os.path.isfile(_loc_file_name):
+# try:
+# os.rename(_loc_file_name, _loc_file_name_bak)
+# return True
+# except Exception as e:
+# _msg = ("ERROR: Unable to rename %s to %s"
+# % (_loc_file_name,
+# _loc_file_name_bak))
+# ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_CRIT,
+# tds.CODE_GENERAL, _msg)
+# return False
+#
+# return False
+#
+## # # # # # # # # # # # #
+## fx: open_log_file
+## # # # # # # # # # # # #
+#
+#
+#def open_file(_loc_file_name):
+# """
+# open _loc_file_name, return file handle
+# """
+#
+# try:
+# # open append mode just in case so nothing is lost, but should be
+# # non-existent file
+# _loc_fd = open(_loc_file_name, 'a')
+# return _loc_fd
+# except Exception as e:
+# msg = "Error opening " + _loc_file_name + " append mode - " + str(e)
+# stdout_logger(msg)
+# cleanup_and_exit(1, tds.pid_file_name)
+#
+#
+## # # # # # # # # # # # #
+## fx: close_file
+## # # # # # # # # # # # #
+# """
+# close _loc_file_name, return True with success, False otherwise
+# """
+#
+#
+#def close_file(_loc_fd, _loc_filename):
+#
+# try:
+#
+# _loc_fd.close()
+# return True
+# except Exception as e:
+# msg = "Error closing %s : %s - results indeterminate" % (
+# _loc_filename, str(e))
+# ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg)
+# return False
+#
+## # # # # # # # # # # # # # # # # # #
+## fx: ecomp_logger -> log in eelf format until standard
+## is released for python via LOG-161
+## # # # # # # # # # ## # # # # # # #
+#
+#def ecomp_logger(_log_type, _sev, _error_code, _msg):
+# """
+# Log to ecomp-style logfiles. Logs include:
+#
+# Note: this will be updated when https://jira.onap.org/browse/LOG-161
+# is closed/available; until then, we resort to a generic format with
+# valuable info in "extra=" field (?)
+#
+# :Parameters:
+# _msg -
+# :Exceptions:
+# none
+# :Keywords:
+# eelf logging
+# :Log Styles:
+#
+# :error.log:
+#
+# if CommonLogger.verbose: print("using CommonLogger.ErrorFile")
+# self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+# % (requestID, threadID, serviceName, partnerName, targetEntity, targetServiceName,
+# errorCategory, errorCode, errorDescription, detailMessage))
+#
+# error.log example:
+#
+# 2018-02-20T07:21:34,007+00:00||MainThread|snmp_log_monitor||||FATAL|900||Tue Feb 20 07:21:11 UTC 2018 CRITICAL: [a0cae74e-160e-11e8-8f9f-0242ac110002] ALL publish attempts failed to DMAPP server: dcae-mrtr-zltcrdm5bdce1.1dff83.rdm5b.tci.att.com, topic: DCAE-COLLECTOR-UCSNMP, 339 trap(s) not published in epoch_serno range: 15191112530000 - 15191112620010
+#
+# :debug.log:
+#
+# if CommonLogger.verbose: print("using CommonLogger.DebugFile")
+# self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+# % (requestID, threadID, serverName, serviceName, instanceUUID, upperLogLevel,
+# severity, serverIPAddress, server, IPAddress, className, timer, detailMessage))
+#
+# debug.log example:
+#
+# none available
+#
+# :audit.log:
+#
+# if CommonLogger.verbose: print("using CommonLogger.AuditFile")
+# endAuditTime, endAuditMsec = self._getTime()
+# if self._begTime is not None:
+# d = {'begtime': self._begTime, 'begmsecs': self._begMsec, 'endtime': endAuditTime,
+# 'endmsecs': endAuditMsec}
+# else:
+# d = {'begtime': endAuditTime, 'begmsecs': endAuditMsec, 'endtime': endAuditTime,
+# 'endmsecs': endAuditMsec}
+#
+# self._logger.log(50, '%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+# % (requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
+# statusCode, responseCode, responseDescription, instanceUUID, upperLogLevel,
+# severity, serverIPAddress, timer, server, IPAddress, className, unused,
+# processKey, customField1, customField2, customField3, customField4,
+# detailMessage), extra=d)
+#
+#
+# :metrics.log:
+#
+# self._logger.log(50,'%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' \
+# % (requestID, serviceInstanceID, threadID, serverName, serviceName, partnerName,
+# targetEntity, targetServiceName, statusCode, responseCode, responseDescription,
+# instanceUUID, upperLogLevel, severity, serverIPAddress, timer, server,
+# IPAddress,
+# className, unused, processKey, targetVirtualEntity, customField1, customField2,
+# customField3, customField4, detailMessage), extra=d)
+#
+# metrics.log example:
+#
+# none available
+#
+#
+# """
+#
+# unused = ""
+#
+# # above were various attempts at setting time string found in other
+# # libs; instead, let's keep it real:
+# t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3]
+# calling_fx = inspect.stack()[1][3]
+#
+# # DLFM: this entire module is a hack to override concept of prog logging
+# # written across multiple files (???), making diagnostics IMPOSSIBLE!
+# # Hoping to leverage ONAP logging libraries & standards when available
+#
+# # catch invalid log type
+# if _log_type < 1 or _log_type > 5:
+# msg = ("INVALID log type: %s " % _log_type)
+# _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s"
+# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, (msg + _msg)))
+# try:
+# tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
+# except Exception as e:
+# stdout_logger(str(_out_rec))
+#
+# return False
+#
+# if _sev >= tds.minimum_severity_to_log:
+# # log to appropriate eelf log (different files ??)
+# if _log_type == tds.LOG_TYPE_ERROR:
+# _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s'
+# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg))
+# try:
+# tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
+# except Exception as e:
+# stdout_logger(str(_out_rec))
+# elif _log_type == tds.LOG_TYPE_AUDIT:
+# # log message in AUDIT format
+# _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s'
+# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg))
+# try:
+# tds.eelf_audit_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
+# except Exception as e:
+# stdout_logger(str(_out_rec))
+# elif _log_type == tds.LOG_TYPE_METRICS:
+# # log message in METRICS format
+# _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s'
+# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg))
+# try:
+# tds.eelf_metrics_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
+# except Exception as e:
+# stdout_logger(str(_out_rec))
+#
+# # DEBUG *AND* others - there *MUST BE* a single time-sequenced log for diagnostics!
+# # DLFM: too much I/O !!!
+# # always write to debug; we need ONE logfile that has time-sequence full view !!!
+# # log message in DEBUG format
+# _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s"
+# % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg))
+# try:
+# tds.eelf_debug_fd.write('%s|%s\n' % (t_out, str(_out_rec)))
+# except Exception as e:
+# stdout_logger(str(_out_rec))
+#
+# return True
+#
+## # # # # # # # # # # # #
+## fx: stdout_logger
+## # # # # # # # # # # # #
def stdout_logger(_msg):
diff --git a/miss_htbt_service/mod/trapd_runtime_pid.py b/miss_htbt_service/mod/trapd_runtime_pid.py
index c6ef76e..c6ef76e 100755..100644
--- a/miss_htbt_service/mod/trapd_runtime_pid.py
+++ b/miss_htbt_service/mod/trapd_runtime_pid.py
diff --git a/miss_htbt_service/mod/trapd_settings.py b/miss_htbt_service/mod/trapd_settings.py
index be87e26..be87e26 100755..100644
--- a/miss_htbt_service/mod/trapd_settings.py
+++ b/miss_htbt_service/mod/trapd_settings.py
diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/trapd_vnf_table.py
new file mode 100644
index 0000000..a76c886
--- /dev/null
+++ b/miss_htbt_service/mod/trapd_vnf_table.py
@@ -0,0 +1,106 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+## Author Kiran Mandal (km386e)
+"""
+trapd_vnf_table verifies the successful creation of DB Tables.
+"""
+
+
+import psycopg2
+import os
+import sys
+import htbtworker as pm
+import misshtbtd as db
+import config_notif as cf
+import cbs_polling as cbs
+import logging
+import get_logger
+import yaml
+import os.path as path
+
+prog_name = os.path.basename(__file__)
+hb_properties_file = path.abspath(path.join(__file__, "../../config/hbproperties.yaml"))
+
+def hb_properties():
+ #Read the hbproperties.yaml for postgress and CBS related data
+ s=open(hb_properties_file, 'r')
+ a=yaml.load(s)
+ ip_address = a['pg_ipAddress']
+ port_num = a['pg_portNum']
+ user_name = a['pg_userName']
+ password = a['pg_passwd']
+ dbName = a['pg_dbName']
+ db_name = dbName.lower()
+ cbs_polling_required = a['CBS_polling_allowed']
+ cbs_polling_interval = a['CBS_polling_interval']
+ s.close()
+ return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+
+ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties()
+
+def verify_DB_creation_1(user_name,password,ip_address,port_num,db_name):
+ connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ # cur = connection_db.cursor()
+ try:
+ _db_status=pm.db_table_creation_check(connection_db,"vnf_table_1")
+ except Exception as e:
+ return None
+
+ return _db_status
+
+def verify_DB_creation_2(user_name,password,ip_address,port_num,db_name):
+
+ connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ # cur = connection_db.cursor()
+ try:
+ _db_status=pm.db_table_creation_check(connection_db,"vnf_table_2")
+ except Exception as e:
+ return None
+
+ return _db_status
+
+def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name):
+
+ connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ #cur = connection_db.cursor()
+ try:
+ _db_status=pm.db_table_creation_check(connection_db,"hb_common")
+ except Exception as e:
+ return None
+
+ return _db_status
+
+
+def verify_cbsPolling_required():
+ try:
+ _cbspolling_status=cf.config_notif_run()
+ except Exception as e:
+ return None
+
+ return _cbspolling_status
+
+def verify_cbspolling():
+ try:
+ _cbspolling=cbs.currentpidMain(10)
+ except Exception as e:
+ return None
+
+ return _cbspolling