summaryrefslogtreecommitdiffstats
path: root/miss_htbt_service
diff options
context:
space:
mode:
authorHansen, Tony (th1395) <th1395@att.com>2021-12-01 22:01:56 +0000
committerHansen, Tony (th1395) <th1395@att.com>2021-12-02 19:58:31 +0000
commit2108563705a2ec8bb80029d36122c69fa4d06df5 (patch)
tree1453b42bc3535635b136d0963a290243e26ae35f /miss_htbt_service
parent8d7c0201456b7f9af6e91fea90354f4c3de323fe (diff)
run the black formatting tool on python code
also fix up some copyright & license block lines Change-Id: Ifb628e2ef1e5f13fed0a29964eec387d3982d605 Signed-off-by: Hansen, Tony (th1395) <th1395@att.com> Issue-ID: DCAEGEN2-2995 Signed-off-by: Hansen, Tony (th1395) <th1395@att.com>
Diffstat (limited to 'miss_htbt_service')
-rw-r--r--miss_htbt_service/__init__.py5
-rw-r--r--miss_htbt_service/cbs_polling.py24
-rw-r--r--miss_htbt_service/check_health.py47
-rw-r--r--miss_htbt_service/db_monitoring.py243
-rw-r--r--miss_htbt_service/get_logger.py10
-rw-r--r--miss_htbt_service/htbtworker.py114
-rw-r--r--miss_htbt_service/misshtbtd.py257
-rw-r--r--miss_htbt_service/mod/trapd_exit.py11
-rw-r--r--miss_htbt_service/mod/trapd_get_cbs_config.py32
-rw-r--r--miss_htbt_service/mod/trapd_http_session.py6
-rw-r--r--miss_htbt_service/mod/trapd_io.py23
-rw-r--r--miss_htbt_service/mod/trapd_runtime_pid.py10
-rw-r--r--miss_htbt_service/mod/trapd_settings.py6
-rw-r--r--miss_htbt_service/mod/trapd_vnf_table.py193
14 files changed, 601 insertions, 380 deletions
diff --git a/miss_htbt_service/__init__.py b/miss_htbt_service/__init__.py
index 1ae08ca..a56d536 100644
--- a/miss_htbt_service/__init__.py
+++ b/miss_htbt_service/__init__.py
@@ -1,5 +1,5 @@
-# ================================================================================
-# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
+# ============LICENSE_START=======================================================
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,4 +16,3 @@
# empty __init__.py so that pytest can add correct path to coverage report, -- per pytest
# best practice guideline
-
diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py
index e7bdef1..53585ba 100644
--- a/miss_htbt_service/cbs_polling.py
+++ b/miss_htbt_service/cbs_polling.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -35,7 +35,15 @@ _logger = logging.getLogger(__name__)
def poll_cbs(current_pid: int) -> None:
jsfile = db.fetch_json_file()
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(jsfile)
hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
msg = "CBSP:Main process ID in hb_common is %d", hbc_pid
_logger.info(msg)
@@ -43,13 +51,13 @@ def poll_cbs(current_pid: int) -> None:
_logger.info(msg)
msg = "CBSP:CBS Polling interval is %d", cbs_polling_interval
_logger.info(msg)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
cbs_polling_interval = "30"
time.sleep(int(cbs_polling_interval))
hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
if current_pid == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING":
_logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION")
state = "RECONFIGURATION"
@@ -60,7 +68,7 @@ def poll_cbs(current_pid: int) -> None:
def cbs_polling_loop(current_pid: int):
- get_logger.configure_logger('cbs_polling')
+ get_logger.configure_logger("cbs_polling")
while True:
poll_cbs(current_pid)
diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py
index 4d04210..71f7c58 100644
--- a/miss_htbt_service/check_health.py
+++ b/miss_htbt_service/check_health.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,38 +22,39 @@ from urllib import parse
class GetHandler(BaseHTTPRequestHandler):
-
def do_GET(self):
parsed_path = parse.urlparse(self.path)
- message = '\n'.join([
- 'CLIENT VALUES:',
- 'client_address=%s (%s)' % (self.client_address, self.address_string()),
- 'command=%s' % self.command,
- 'path=%s' % self.path,
- 'real path=%s' % parsed_path.path,
- 'query=%s' % parsed_path.query,
- 'request_version=%s' % self.request_version,
- '',
- 'SERVER VALUES:',
- 'server_version=%s' % self.server_version,
- 'sys_version=%s' % self.sys_version,
- 'protocol_version=%s' % self.protocol_version,
- '',
- ])
+ message = "\n".join(
+ [
+ "CLIENT VALUES:",
+ "client_address=%s (%s)" % (self.client_address, self.address_string()),
+ "command=%s" % self.command,
+ "path=%s" % self.path,
+ "real path=%s" % parsed_path.path,
+ "query=%s" % parsed_path.query,
+ "request_version=%s" % self.request_version,
+ "",
+ "SERVER VALUES:",
+ "server_version=%s" % self.server_version,
+ "sys_version=%s" % self.sys_version,
+ "protocol_version=%s" % self.protocol_version,
+ "",
+ ]
+ )
self.send_response(200)
self.end_headers()
- self.wfile.write(bytes(message, 'utf-8'))
+ self.wfile.write(bytes(message, "utf-8"))
return
def do_POST(self):
- content_len = int(self.headers.get('content-length', 0))
+ content_len = int(self.headers.get("content-length", 0))
post_body = self.rfile.read(content_len)
self.send_response(200)
self.end_headers()
data = json.loads(post_body)
- self.wfile.write(bytes(data['health'], 'utf-8'))
+ self.wfile.write(bytes(data["health"], "utf-8"))
return
@@ -61,9 +62,9 @@ def start_health_check_server() -> None:
from http.server import HTTPServer
server = HTTPServer(("", 10002), GetHandler)
- print('Starting server at http://localhost:10002')
+ print("Starting server at http://localhost:10002")
server.serve_forever()
-if __name__ == '__main__':
+if __name__ == "__main__":
start_health_check_server()
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
index a405876..32a2d5c 100644
--- a/miss_htbt_service/db_monitoring.py
+++ b/miss_htbt_service/db_monitoring.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -37,81 +37,100 @@ import get_logger
_logger = logging.getLogger(__name__)
-def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time,
- closed_control_loop_name, version, target):
+def sendControlLoopEvent(
+ CLType,
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+):
msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type
_logger.info(msg)
if CLType == "ONSET":
_logger.info("DBM:Heartbeat not received, raising alarm event")
if target_type == "VNF":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"generic-vnf.vnf-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"generic-vnf.vnf-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
elif target_type == "VM":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"vserver.vserver-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"vserver.vserver-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
else:
return True
elif CLType == "ABATED":
_logger.info("DBM:Heartbeat received, clearing alarm event")
# last_date_time = datetime.datetime.now()
if target_type == "VNF":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"generic-vnf.vnf-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ABATED",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"generic-vnf.vnf-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ABATED",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
elif target_type == "VM":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"vserver.vserver-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ABATED",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"vserver.vserver-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ABATED",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
else:
return True
else:
@@ -131,7 +150,7 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_sc
msg = "DBM:Status code for sending the control loop event is", ret
_logger.info(msg)
except Exception as err:
- msg = 'Message send failure : ', err
+ msg = "Message send failure : ", err
_logger.error(msg)
return True
@@ -140,22 +159,24 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
while True:
time.sleep(20)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
break
try:
- with open(json_file, 'r') as outfile:
+ with open(json_file, "r") as outfile:
cfg = json.load(outfile)
- pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
+ pol_url = str(cfg["streams_publishes"]["dcae_cl_out"]["dmaap_info"]["topic_url"])
except Exception as err:
- msg = 'Json file process error : ', err
+ msg = "Json file process error : ", err
_logger.error(msg)
continue
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
if int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING":
@@ -170,9 +191,12 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
_logger.info("DBM:Waiting for hb_common state to become RUNNING")
break
- cur.execute("SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, "
- "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, "
- "target, version FROM vnf_table_1 WHERE event_name = %s", (event_name,))
+ cur.execute(
+ "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, "
+ "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, "
+ "target, version FROM vnf_table_1 WHERE event_name = %s",
+ (event_name,),
+ )
rows = cur.fetchall()
validity_flag = rows[0][0]
source_name_count = rows[0][1]
@@ -189,8 +213,11 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
if validity_flag == 1:
for source_name_key in range(source_name_count):
epoc_time = int(round(time.time() * 1000))
- cur.execute("SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE "
- "event_name = %s AND source_name_key = %s", (event_name, (source_name_key + 1)))
+ cur.execute(
+ "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE "
+ "event_name = %s AND source_name_key = %s",
+ (event_name, (source_name_key + 1)),
+ )
row = cur.fetchall()
if len(row) == 0:
continue
@@ -198,20 +225,44 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
srcName = row[0][1]
cl_flag = row[0][2]
if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0:
- sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope,
- target_type, srcName, epoc_time, closed_control_loop_name, version,
- target)
+ sendControlLoopEvent(
+ "ONSET",
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+ )
cl_flag = 1
- cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND "
- "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
+ (cl_flag, event_name, (source_name_key + 1)),
+ )
connection_db.commit()
elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1:
- sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope,
- target_type, srcName, epoc_time, closed_control_loop_name, version,
- target)
+ sendControlLoopEvent(
+ "ABATED",
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+ )
cl_flag = 0
- cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND "
- "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
+ (cl_flag, event_name, (source_name_key + 1)),
+ )
connection_db.commit()
else: # pragma: no cover
@@ -233,15 +284,23 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
if __name__ == "__main__":
- get_logger.configure_logger('db_monitoring')
+ get_logger.configure_logger("db_monitoring")
_logger.info("DBM: DBM Process started")
current_pid = sys.argv[1]
jsfile = sys.argv[2]
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(jsfile)
msg = "DBM:Parent process ID and json file name", current_pid, jsfile
_logger.info(msg)
while True:
db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
break
diff --git a/miss_htbt_service/get_logger.py b/miss_htbt_service/get_logger.py
index 55286eb..8068f6b 100644
--- a/miss_htbt_service/get_logger.py
+++ b/miss_htbt_service/get_logger.py
@@ -1,6 +1,6 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
import logging.handlers
LOG_LEVEL = logging.DEBUG
-LOG_FORMAT = '%(asctime)s | %(levelname)5s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(message)s'
+LOG_FORMAT = "%(asctime)s | %(levelname)5s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(message)s"
LOG_MAXSIZE = 10485760 * 5
LOG_BACKUP_COUNT = 10
@@ -39,9 +39,9 @@ def configure_logger(proc_name: str) -> None:
# Add rotating log file handler
if proc_name:
- logfile_path = './hb_%s_logs.txt' % proc_name
+ logfile_path = "./hb_%s_logs.txt" % proc_name
else:
- logfile_path = './hb_logs.txt'
+ logfile_path = "./hb_logs.txt"
fhandler = logging.handlers.RotatingFileHandler(logfile_path, maxBytes=LOG_MAXSIZE, backupCount=LOG_BACKUP_COUNT)
fhandler.setFormatter(formatter)
root.addHandler(fhandler)
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index be1b6aa..44436a2 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -45,7 +45,7 @@ def read_json_file(i, prefix="../../tests"):
with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile:
cfg = json.load(outfile)
elif i == 2:
- with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile:
+ with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile:
cfg = json.load(outfile)
return cfg
@@ -56,19 +56,21 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
sleep_duration = 20
while True:
time.sleep(sleep_duration)
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
- mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url'])
+ mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"])
while True:
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
if hbc_state == "RECONFIGURATION":
_logger.info("HBT:Waiting for hb_common state to become RUNNING")
time.sleep(10)
else:
break
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
connection_db = 0
else:
@@ -79,13 +81,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
msg = "\n\nHBT:eventnameList values ", eventnameList
_logger.info(msg)
if "groupID" not in os.environ or "consumerID" not in os.environ:
- get_url = mr_url + '/DefaultGroup/1?timeout=15000'
+ get_url = mr_url + "/DefaultGroup/1?timeout=15000"
else:
- get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000'
+ get_url = mr_url + "/" + os.getenv("groupID", "") + "/" + os.getenv("consumerID", "") + "?timeout=15000"
msg = "HBT:Getting :" + get_url
_logger.info(msg)
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
jsonobj = read_json_file(i)
jobj = []
jobj.append(jsonobj)
@@ -103,14 +105,14 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
# If mrstatus in message body indicates some information, not json msg.
if "mrstatus" in inputString:
continue
- jlist = inputString.split('\n')
+ jlist = inputString.split("\n")
# Process the DMaaP input message retreived
error = False
for line in jlist:
try:
jobj = json.loads(line)
except ValueError:
- msg = 'HBT:Decoding JSON has failed'
+ msg = "HBT:Decoding JSON has failed"
_logger.error(msg)
error = True
break
@@ -120,17 +122,17 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
continue
for item in jobj:
try:
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
jitem = jsonobj
else:
jitem = json.loads(item)
- srcname = (jitem['event']['commonEventHeader']['sourceName'])
- lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec'])
+ srcname = jitem["event"]["commonEventHeader"]["sourceName"]
+ lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
# if lastEpochMicrosec looks like microsec, align it with millisec
if lastepo > 1000000000000000:
lastepo = int(lastepo / 1000)
- seqnum = (jitem['event']['commonEventHeader']['sequence'])
- eventName = (jitem['event']['commonEventHeader']['eventName'])
+ seqnum = jitem["event"]["commonEventHeader"]["sequence"]
+ eventName = jitem["event"]["commonEventHeader"]["eventName"]
except Exception as err:
msg = "HBT message process error - ", err
_logger.error(msg)
@@ -140,7 +142,8 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if db_table_creation_check(connection_db, "vnf_table_2") is False:
msg = "HBT:Creating vnf_table_2"
_logger.info(msg)
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE vnf_table_2 (
EVENT_NAME varchar,
SOURCE_NAME_KEY integer,
@@ -148,12 +151,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
LAST_EPO_TIME BIGINT,
SOURCE_NAME varchar,
CL_FLAG integer
- )""")
+ )"""
+ )
else:
msg = "HBT:vnf_table_2 is already there"
_logger.info(msg)
if eventName in eventnameList: # pragma: no cover
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
break
cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (eventName,))
row = cur.fetchone()
@@ -162,16 +166,22 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
cl_flag = 0
if source_name_count == 0: # pragma: no cover
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (eventName, source_name_key, lastepo, srcname, cl_flag))
- cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s",
- (source_name_key, eventName))
+ cur.execute(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
+ (eventName, source_name_key, lastepo, srcname, cl_flag),
+ )
+ cur.execute(
+ "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s",
+ (source_name_key, eventName),
+ )
else: # pragma: no cover
msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count
_logger.info(msg)
for source_name_key in range(source_name_count):
- cur.execute("SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND "
- "source_name_key = %s", (eventName, (source_name_key + 1)))
+ cur.execute(
+ "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s",
+ (eventName, (source_name_key + 1)),
+ )
row = cur.fetchall()
if len(row) == 0:
continue
@@ -179,9 +189,11 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if db_srcname == srcname:
msg = "HBT: Update vnf_table_2 : ", source_name_key, row
_logger.info(msg)
- cur.execute("UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
- "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
- (lastepo, srcname, eventName, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
+ "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
+ (lastepo, srcname, eventName, (source_name_key + 1)),
+ )
source_name_key = source_name_count
break
else:
@@ -191,27 +203,31 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if source_name_count == (source_name_key + 1):
source_name_key = source_name_count + 1
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (eventName, source_name_key, lastepo, srcname, cl_flag))
- cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
- (source_name_key, eventName))
+ cur.execute(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
+ (eventName, source_name_key, lastepo, srcname, cl_flag),
+ )
+ cur.execute(
+ "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
+ (source_name_key, eventName),
+ )
else:
_logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
commit_db(connection_db)
commit_and_close_db(connection_db)
- if os.getenv('pytest', "") != 'test':
+ if os.getenv("pytest", "") != "test":
cur.close()
def postgres_db_open(username, password, host, port, database_name):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port)
return connection
def db_table_creation_check(connection_db, table_name):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
cur = connection_db.cursor()
@@ -223,43 +239,51 @@ def db_table_creation_check(connection_db, table_name):
else:
return False
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
finally:
cur.close()
def commit_db(connection_db):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
connection_db.commit() # <--- makes sure the change is shown in the database
return True
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
def commit_and_close_db(connection_db):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
connection_db.commit() # <--- makes sure the change is shown in the database
connection_db.close()
return True
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
-if __name__ == '__main__':
- get_logger.configure_logger('htbtworker')
+if __name__ == "__main__":
+ get_logger.configure_logger("htbtworker")
jsfile = sys.argv[1]
msg = "HBT:HeartBeat thread Created"
_logger.info("HBT:HeartBeat thread Created")
msg = "HBT:The config file name passed is -%s", jsfile
_logger.info(msg)
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(jsfile)
process_msg(jsfile, user_name, password, ip_address, port_num, db_name)
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py
index 6be2260..5ba0860 100644
--- a/miss_htbt_service/misshtbtd.py
+++ b/miss_htbt_service/misshtbtd.py
@@ -2,9 +2,9 @@
# ============LICENSE_START=======================================================
# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Samsung Electronics. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Samsung Electronics. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@ CONFIG_PATH = "../etc/config.json"
def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
from psycopg2 import connect
+
try:
con = connect(user=user_name, host=ip_address, password=password)
database_name = db_name
@@ -82,8 +83,8 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password
def read_hb_common(user_name, password, ip_address, port_num, db_name):
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
hbc_pid = 10
hbc_srcName = "srvc_name"
hbc_time = 1584595881
@@ -105,41 +106,47 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name):
def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name):
current_time = int(round(time.time()))
source_name = socket.gethostname()
- source_name = source_name + "-" + os.getenv('SERVICE_NAME', "")
- envPytest = os.getenv('pytest', "")
- if envPytest != 'test':
+ source_name = source_name + "-" + os.getenv("SERVICE_NAME", "")
+ envPytest = os.getenv("pytest", "")
+ if envPytest != "test":
connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
if heartbeat.db_table_creation_check(connection_db, "hb_common") is False:
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE hb_common (
PROCESS_ID integer primary key,
SOURCE_NAME varchar,
LAST_ACCESSED_TIME integer,
CURRENT_STATE varchar
- )""")
+ )"""
+ )
cur.execute("INSERT INTO hb_common VALUES(%s, %s, %s, %s)", (process_id, source_name, current_time, state))
_logger.info("MSHBT:Created hb_common DB and updated new values")
elif update_flg == 1:
- cur.execute("UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s "
- "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", (current_time, state, process_id, source_name))
+ cur.execute(
+ "UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s "
+ "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s",
+ (current_time, state, process_id, source_name),
+ )
_logger.info("MSHBT:Updated hb_common DB with new values")
heartbeat.commit_and_close_db(connection_db)
cur.close()
def create_update_vnf_table_1(jsfile, update_db, connection_db):
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
- hbcfg = cfg['heartbeat_config']
+ hbcfg = cfg["heartbeat_config"]
jhbcfg = json.loads(hbcfg)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
else:
cur = connection_db.cursor()
if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False:
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE vnf_table_1 (
EVENT_NAME varchar primary key,
HEARTBEAT_MISSED_COUNT integer,
@@ -153,7 +160,8 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db):
VERSION varchar,
SOURCE_NAME_COUNT integer,
VALIDITY_FLAG integer
- )""")
+ )"""
+ )
_logger.info("MSHBT:Created vnf_table_1 table")
if update_db == 1:
cur.execute("UPDATE vnf_table_1 SET VALIDITY_FLAG=0 WHERE VALIDITY_FLAG=1")
@@ -161,35 +169,62 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db):
# Put some initial values into the queue
cur.execute("SELECT event_name FROM vnf_table_1")
vnf_list = [item[0] for item in cur.fetchall()]
- for vnf in (jhbcfg['vnfs']):
- nfc = vnf['eventName']
+ for vnf in jhbcfg["vnfs"]:
+ nfc = vnf["eventName"]
validity_flag = 1
source_name_count = 0
- missed = vnf['heartbeatcountmissed']
- intvl = vnf['heartbeatinterval']
- clloop = vnf['closedLoopControlName']
- policyVersion = vnf['policyVersion']
- policyName = vnf['policyName']
- policyScope = vnf['policyScope']
- target_type = vnf['target_type']
- target = vnf['target']
- version = vnf['version']
-
- if envPytest == 'test':
+ missed = vnf["heartbeatcountmissed"]
+ intvl = vnf["heartbeatinterval"]
+ clloop = vnf["closedLoopControlName"]
+ policyVersion = vnf["policyVersion"]
+ policyName = vnf["policyName"]
+ policyScope = vnf["policyScope"]
+ target_type = vnf["target_type"]
+ target = vnf["target"]
+ version = vnf["version"]
+
+ if envPytest == "test":
# skip executing SQL in test
continue
if nfc not in vnf_list:
- cur.execute("INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
- (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target,
- version, source_name_count, validity_flag))
+ cur.execute(
+ "INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
+ (
+ nfc,
+ missed,
+ intvl,
+ clloop,
+ policyVersion,
+ policyName,
+ policyScope,
+ target_type,
+ target,
+ version,
+ source_name_count,
+ validity_flag,
+ ),
+ )
_logger.debug("Inserted new event_name = %s into vnf_table_1", nfc)
else:
- cur.execute("""UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s,
+ cur.execute(
+ """UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s,
CLOSED_CONTROL_LOOP_NAME = %s, POLICY_VERSION = %s, POLICY_NAME = %s, POLICY_SCOPE = %s,
TARGET_TYPE = %s, TARGET = %s, VERSION = %s, VALIDITY_FLAG = %s where EVENT_NAME = %s""",
- (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version,
- validity_flag, nfc))
- if envPytest != 'test':
+ (
+ missed,
+ intvl,
+ clloop,
+ policyVersion,
+ policyName,
+ policyScope,
+ target_type,
+ target,
+ version,
+ validity_flag,
+ nfc,
+ ),
+ )
+ if envPytest != "test":
cur.close()
_logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file")
@@ -210,31 +245,36 @@ def db_monitoring_process(current_pid, jsfile):
def read_hb_properties_default():
# Read the hbproperties.yaml for postgress and CBS related data
- s = open(hb_properties_file, 'r')
+ s = open(hb_properties_file, "r")
a = yaml.full_load(s)
- if (os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None):
- ip_address = a['pg_ipAddress']
- port_num = a['pg_portNum']
- user_name = a['pg_userName']
- password = a['pg_passwd']
+ if (
+ (os.getenv("pg_ipAddress") is None)
+ or (os.getenv("pg_portNum") is None)
+ or (os.getenv("pg_userName") is None)
+ or (os.getenv("pg_passwd") is None)
+ ):
+ ip_address = a["pg_ipAddress"]
+ port_num = a["pg_portNum"]
+ user_name = a["pg_userName"]
+ password = a["pg_passwd"]
else:
- ip_address = os.getenv('pg_ipAddress')
- port_num = os.getenv('pg_portNum')
- user_name = os.getenv('pg_userName')
- password = os.getenv('pg_passwd')
+ ip_address = os.getenv("pg_ipAddress")
+ port_num = os.getenv("pg_portNum")
+ user_name = os.getenv("pg_userName")
+ password = os.getenv("pg_passwd")
- dbName = a['pg_dbName']
+ dbName = a["pg_dbName"]
db_name = dbName.lower()
- cbs_polling_required = a['CBS_polling_allowed']
- cbs_polling_interval = a['CBS_polling_interval']
+ cbs_polling_required = a["CBS_polling_allowed"]
+ cbs_polling_interval = a["CBS_polling_interval"]
s.close()
return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
def read_hb_properties(jsfile):
try:
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
except Exception as err:
msg = "CBS Json file load error - " + str(err)
@@ -242,20 +282,20 @@ def read_hb_properties(jsfile):
return read_hb_properties_default()
try:
- ip_address = str(cfg['pg_ipAddress'])
- port_num = str(cfg['pg_portNum'])
- user_name = str(cfg['pg_userName'])
- password = str(cfg['pg_passwd'])
- dbName = str(cfg['pg_dbName'])
+ ip_address = str(cfg["pg_ipAddress"])
+ port_num = str(cfg["pg_portNum"])
+ user_name = str(cfg["pg_userName"])
+ password = str(cfg["pg_passwd"])
+ dbName = str(cfg["pg_dbName"])
db_name = dbName.lower()
- cbs_polling_required = str(cfg['CBS_polling_allowed'])
- cbs_polling_interval = str(cfg['CBS_polling_interval'])
- consumer_id = str(cfg['consumerID'])
- group_id = str(cfg['groupID'])
- os.environ['consumerID'] = consumer_id
- os.environ['groupID'] = group_id
+ cbs_polling_required = str(cfg["CBS_polling_allowed"])
+ cbs_polling_interval = str(cfg["CBS_polling_interval"])
+ consumer_id = str(cfg["consumerID"])
+ group_id = str(cfg["groupID"])
+ os.environ["consumerID"] = consumer_id
+ os.environ["groupID"] = group_id
if "SERVICE_NAME" in cfg:
- os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME'])
+ os.environ["SERVICE_NAME"] = str(cfg["SERVICE_NAME"])
except Exception as err:
msg = "CBS Json file read parameter error - " + str(err)
_logger.error(msg)
@@ -273,7 +313,7 @@ def fetch_json_file() -> str:
# Try to get config from CBS. If succeeded, config json is stored to tds.c_config .
if get_cbs_config():
# Save config to temporary file
- with tempfile.NamedTemporaryFile('w', delete=False) as temp:
+ with tempfile.NamedTemporaryFile("w", delete=False) as temp:
_logger.info("MSHBD: New config saved to temp file %s", temp.name)
json.dump(tds.c_config, temp)
# Swap current config with downloaded config
@@ -289,8 +329,8 @@ def fetch_json_file() -> str:
def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
- envPytest = os.getenv('pytest', "")
- if envPytest != 'test': # pragma: no cover
+ envPytest = os.getenv("pytest", "")
+ if envPytest != "test": # pragma: no cover
if update_db == 0:
create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name
@@ -310,7 +350,13 @@ def create_process(job_list, jsfile, pid_current):
if len(job_list) == 0:
p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,))
time.sleep(1)
- p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,))
+ p2 = multiprocessing.Process(
+ target=db_monitoring_process,
+ args=(
+ pid_current,
+ jsfile,
+ ),
+ )
p1.start()
time.sleep(1)
p2.start()
@@ -322,7 +368,7 @@ def create_process(job_list, jsfile, pid_current):
def main():
- get_logger.configure_logger('misshtbtd')
+ get_logger.configure_logger("misshtbtd")
pid_current = os.getpid()
hc_proc = multiprocessing.Process(target=check_health.start_health_check_server)
cbs_polling_proc = multiprocessing.Process(target=cbs_polling.cbs_polling_loop, args=(pid_current,))
@@ -334,15 +380,32 @@ def main():
job_list = []
jsfile = fetch_json_file()
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile)
- msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = read_hb_properties(jsfile)
+ msg = (
+ "MSHBT:HB Properties -",
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ )
_logger.info(msg)
update_db = 0
create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
state = "RECONFIGURATION"
update_flg = 0
create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
- if cbs_polling_required == 'True':
+ if cbs_polling_required == "True":
# note: cbs_polling process must be started after `hb_common` table created
cbs_polling_proc.start()
_logger.info("MSHBD: Started CBS polling process. PID=%d", cbs_polling_proc.pid)
@@ -350,17 +413,27 @@ def main():
_logger.info("MSHBD:Now be in a continuous loop")
i = 0
while True:
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time
_logger.info(msg)
current_time = int(round(time.time()))
time_difference = current_time - hbc_time
- msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference
+ msg = (
+ "MSHBD:pid,srcName,state,time,ctime,timeDiff is",
+ hbc_pid,
+ hbc_srcName,
+ hbc_state,
+ hbc_time,
+ current_time,
+ time_difference,
+ )
_logger.info(msg)
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
if i == 2:
hbc_pid = pid_current
source_name = hbc_srcName
@@ -376,9 +449,13 @@ def main():
if hbc_state == "RUNNING":
state = "RUNNING"
update_flg = 1
- create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name
+ )
elif hbc_state == "RECONFIGURATION":
- _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes")
+ _logger.info(
+ "MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes"
+ )
jsfile = fetch_json_file()
update_db = 1
create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
@@ -387,7 +464,9 @@ def main():
job_list = create_process(job_list, jsfile, pid_current)
state = "RUNNING"
update_flg = 1
- create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name
+ )
else:
_logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping")
@@ -395,7 +474,11 @@ def main():
_logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE")
else:
jsfile = fetch_json_file()
- msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current
+ msg = (
+ "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",
+ jsfile,
+ pid_current,
+ )
_logger.info(msg)
job_list = create_process(job_list, jsfile, pid_current)
else:
@@ -409,13 +492,17 @@ def main():
msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current
_logger.info(msg)
job_list = create_process(job_list, jsfile, pid_current)
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
update_flg = 1
- create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name
+ )
else:
_logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR")
time.sleep(25)
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
i = i + 1
if i > 5:
_logger.info("Terminating main process for pytest")
@@ -451,5 +538,5 @@ def main():
cbs_polling_proc.join()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/miss_htbt_service/mod/trapd_exit.py b/miss_htbt_service/mod/trapd_exit.py
index c741fe5..aae12bb 100644
--- a/miss_htbt_service/mod/trapd_exit.py
+++ b/miss_htbt_service/mod/trapd_exit.py
@@ -1,9 +1,7 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,7 +21,7 @@ trapc_exit_snmptrapd is responsible for removing any existing runtime PID
file, and exiting with the provided (param 1) exit code
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
import sys
import os
@@ -62,6 +60,7 @@ def cleanup_and_exit(_loc_exit_code, _pid_file_name):
rc = rm_pid(_pid_file_name)
sys.exit(_loc_exit_code)
+
# # # # # # # # # # # # #
# fx: cleanup_and_exit
# - remove pid file
@@ -89,5 +88,3 @@ def cleanup(_loc_exit_code, _pid_file_name):
if _pid_file_name is not None:
rc = rm_pid(_pid_file_name)
-
-
diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/trapd_get_cbs_config.py
index 70dac53..f9fd4c3 100644
--- a/miss_htbt_service/mod/trapd_get_cbs_config.py
+++ b/miss_htbt_service/mod/trapd_get_cbs_config.py
@@ -1,10 +1,8 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Samsung Electronics. All rights reserved.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Samsung Electronics. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,7 +23,7 @@ env variable that specifies JSON equiv of CBS config (typically used for
testing purposes)
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
import json
import os
@@ -36,7 +34,7 @@ import traceback
import collections.abc
from onap_dcae_cbs_docker_client.client import get_config
from mod import trapd_settings as tds
-from mod.trapd_exit import cleanup,cleanup_and_exit
+from mod.trapd_exit import cleanup, cleanup_and_exit
from mod.trapd_io import stdout_logger
prog_name = os.path.basename(__file__)
@@ -75,46 +73,44 @@ def get_cbs_config():
_cbs_sim_json_file = os.getenv("CBS_HTBT_JSON", "None")
except Exception as e:
stdout_logger(msg)
- cleanup(1,None)
+ cleanup(1, None)
return False
msg = "CBS_HTBT_JSON not defined - FATAL ERROR, exiting"
if _cbs_sim_json_file == "None":
stdout_logger(msg)
- cleanup(1,None)
+ cleanup(1, None)
return False
else:
- msg = ("ONAP controller override specified via CBS_HTBT_JSON: %s" %
- _cbs_sim_json_file)
+ msg = "ONAP controller override specified via CBS_HTBT_JSON: %s" % _cbs_sim_json_file
stdout_logger(msg)
- msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + \
- " (invalid json?) - FATAL ERROR, exiting"
+ msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting"
try:
tds.c_config = json.load(open(_cbs_sim_json_file))
except Exception as e:
stdout_logger(msg)
- cleanup_and_exit(0,None)
+ cleanup_and_exit(0, None)
# recalc timeout, set default if not present
try:
- tds.timeout_seconds = tds.c_config['publisher.http_timeout_milliseconds'] / 1000.0
+ tds.timeout_seconds = tds.c_config["publisher.http_timeout_milliseconds"] / 1000.0
except Exception as e:
tds.timeout_seconds = 1.5
# recalc seconds_between_retries, set default if not present
try:
- tds.seconds_between_retries = tds.c_config['publisher.http_milliseconds_between_retries'] / 1000.0
+ tds.seconds_between_retries = tds.c_config["publisher.http_milliseconds_between_retries"] / 1000.0
except Exception as e:
- tds.seconds_between_retries = .750
+ tds.seconds_between_retries = 0.750
# recalc min_severity_to_log, set default if not present
try:
- tds.minimum_severity_to_log = tds.c_config['files.minimum_severity_to_log']
+ tds.minimum_severity_to_log = tds.c_config["files.minimum_severity_to_log"]
except Exception as e:
tds.minimum_severity_to_log = 3
try:
- tds.publisher_retries = tds.c_config['publisher.http_retries']
+ tds.publisher_retries = tds.c_config["publisher.http_retries"]
except Exception as e:
tds.publisher_retries = 3
diff --git a/miss_htbt_service/mod/trapd_http_session.py b/miss_htbt_service/mod/trapd_http_session.py
index 0d7f220..fc7e865 100644
--- a/miss_htbt_service/mod/trapd_http_session.py
+++ b/miss_htbt_service/mod/trapd_http_session.py
@@ -1,7 +1,5 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -21,7 +19,7 @@ trapd_http_session establishes an http session for future use in publishing
messages to the dmaap cluster.
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
import os
import requests
diff --git a/miss_htbt_service/mod/trapd_io.py b/miss_htbt_service/mod/trapd_io.py
index d60f060..3e03999 100644
--- a/miss_htbt_service/mod/trapd_io.py
+++ b/miss_htbt_service/mod/trapd_io.py
@@ -1,9 +1,7 @@
# ============LICENSE_START=======================================================)
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -21,7 +19,7 @@
"""
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
# basics
import datetime
@@ -36,6 +34,7 @@ import string
import time
import traceback
import unicodedata
+
# dcae_snmptrap
import mod.trapd_settings as tds
from mod.trapd_exit import cleanup_and_exit
@@ -48,7 +47,7 @@ prog_name = os.path.basename(__file__)
# # # # # # # # # # ## # # # # # # #
-#def roll_all_logs():
+# def roll_all_logs():
# """
# roll all active logs to timestamped version, open new one
# based on frequency defined in files.roll_frequency
@@ -107,7 +106,7 @@ prog_name = os.path.basename(__file__)
# # # # # # # # # # ## # # # # # # #
-#def open_eelf_logs():
+# def open_eelf_logs():
# """
# open various (multiple ???) logs
# """
@@ -159,7 +158,7 @@ prog_name = os.path.basename(__file__)
# # # # # # # # # # ## # # # # # # #
-#def roll_file(_loc_file_name):
+# def roll_file(_loc_file_name):
# """
# move active file to timestamped archive
# """
@@ -190,7 +189,7 @@ prog_name = os.path.basename(__file__)
## # # # # # # # # # # # #
#
#
-#def open_file(_loc_file_name):
+# def open_file(_loc_file_name):
# """
# open _loc_file_name, return file handle
# """
@@ -214,7 +213,7 @@ prog_name = os.path.basename(__file__)
# """
#
#
-#def close_file(_loc_fd, _loc_filename):
+# def close_file(_loc_fd, _loc_filename):
#
# try:
#
@@ -231,7 +230,7 @@ prog_name = os.path.basename(__file__)
## is released for python via LOG-161
## # # # # # # # # # ## # # # # # # #
#
-#def ecomp_logger(_log_type, _sev, _error_code, _msg):
+# def ecomp_logger(_log_type, _sev, _error_code, _msg):
# """
# Log to ecomp-style logfiles. Logs include:
#
@@ -389,4 +388,4 @@ def stdout_logger(_msg):
t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3]
- print('%s %s' % (t_out, _msg))
+ print("%s %s" % (t_out, _msg))
diff --git a/miss_htbt_service/mod/trapd_runtime_pid.py b/miss_htbt_service/mod/trapd_runtime_pid.py
index d2db83d..47612bd 100644
--- a/miss_htbt_service/mod/trapd_runtime_pid.py
+++ b/miss_htbt_service/mod/trapd_runtime_pid.py
@@ -1,7 +1,5 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -21,7 +19,7 @@ trapd_runtime_pid maintains a 'PID file' (file that contains the
PID of currently running trap receiver)
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
import logging
import os
@@ -50,8 +48,8 @@ def save_pid(_pid_file_name):
"""
try:
- pid_fd = open(_pid_file_name, 'w')
- pid_fd.write('%d' % os.getpid())
+ pid_fd = open(_pid_file_name, "w")
+ pid_fd.write("%d" % os.getpid())
pid_fd.close()
except IOError:
print("IOError saving PID file %s :" % _pid_file_name)
diff --git a/miss_htbt_service/mod/trapd_settings.py b/miss_htbt_service/mod/trapd_settings.py
index 0c8457f..3e7f141 100644
--- a/miss_htbt_service/mod/trapd_settings.py
+++ b/miss_htbt_service/mod/trapd_settings.py
@@ -1,7 +1,5 @@
# ============LICENSE_START=======================================================)
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,7 +17,7 @@
"""
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
def init():
diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/trapd_vnf_table.py
index db6bd17..1b00336 100644
--- a/miss_htbt_service/mod/trapd_vnf_table.py
+++ b/miss_htbt_service/mod/trapd_vnf_table.py
@@ -1,11 +1,9 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Samsung Electronics. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Samsung Electronics. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -44,45 +42,47 @@ _logger = logging.getLogger(__name__)
def hb_properties():
- #Read the hbproperties.yaml for postgress and CBS related data
- s=open(hb_properties_file, 'r')
- a=yaml.full_load(s)
- ip_address = a['pg_ipAddress']
- port_num = a['pg_portNum']
- user_name = a['pg_userName']
- password = a['pg_passwd']
- dbName = a['pg_dbName']
- db_name = dbName.lower()
- cbs_polling_required = a['CBS_polling_allowed']
- cbs_polling_interval = a['CBS_polling_interval']
- s.close()
- return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
-
-
-def verify_DB_creation_1(user_name,password,ip_address,port_num,db_name):
- connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ # Read the hbproperties.yaml for postgress and CBS related data
+ s = open(hb_properties_file, "r")
+ a = yaml.full_load(s)
+ ip_address = a["pg_ipAddress"]
+ port_num = a["pg_portNum"]
+ user_name = a["pg_userName"]
+ password = a["pg_passwd"]
+ dbName = a["pg_dbName"]
+ db_name = dbName.lower()
+ cbs_polling_required = a["CBS_polling_allowed"]
+ cbs_polling_interval = a["CBS_polling_interval"]
+ s.close()
+ return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+
+
+def verify_DB_creation_1(user_name, password, ip_address, port_num, db_name):
+ connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
try:
- _db_status=pm.db_table_creation_check(connection_db,"vnf_table_1")
+ _db_status = pm.db_table_creation_check(connection_db, "vnf_table_1")
except Exception as e:
return None
return _db_status
-def verify_DB_creation_2(user_name,password,ip_address,port_num,db_name):
- connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+def verify_DB_creation_2(user_name, password, ip_address, port_num, db_name):
+
+ connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
try:
- _db_status=pm.db_table_creation_check(connection_db,"vnf_table_2")
+ _db_status = pm.db_table_creation_check(connection_db, "vnf_table_2")
except Exception as e:
return None
return _db_status
-def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name):
- connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+def verify_DB_creation_hb_common(user_name, password, ip_address, port_num, db_name):
+
+ connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
try:
- _db_status=pm.db_table_creation_check(connection_db,"hb_common")
+ _db_status = pm.db_table_creation_check(connection_db, "hb_common")
except Exception as e:
return None
@@ -90,35 +90,36 @@ def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name)
def verify_cbspolling():
- os.environ['pytest']='test'
- os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static'
+ os.environ["pytest"] = "test"
+ os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static"
cbs.poll_cbs(10)
- os.unsetenv('pytest')
- os.unsetenv('SERVICE_NAME')
+ os.unsetenv("pytest")
+ os.unsetenv("SERVICE_NAME")
def verify_fetch_json_file():
- os.environ['pytest']='test'
- os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static'
- os.environ['CONSUL_HOST']='localhost'
- os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static'
+ os.environ["pytest"] = "test"
+ os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static"
+ os.environ["CONSUL_HOST"] = "localhost"
+ os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static"
try:
- db.fetch_json_file()
- result = True
+ db.fetch_json_file()
+ result = True
except Exception as e:
- result = False
+ result = False
print(result)
- os.unsetenv('pytest')
- os.unsetenv('SERVICE_NAME')
- os.unsetenv('CONSUL_HOST')
- os.unsetenv('HOSTNAME')
+ os.unsetenv("pytest")
+ os.unsetenv("SERVICE_NAME")
+ os.unsetenv("CONSUL_HOST")
+ os.unsetenv("HOSTNAME")
return result
+
def verify_misshtbtdmain():
- os.environ['pytest']='test'
- os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static'
- os.environ['CONSUL_HOST']='localhost'
- os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static'
+ os.environ["pytest"] = "test"
+ os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static"
+ os.environ["CONSUL_HOST"] = "localhost"
+ os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static"
try:
db.main()
@@ -126,69 +127,125 @@ def verify_misshtbtdmain():
except Exception as e:
result = False
print(result)
- os.unsetenv('pytest')
- os.unsetenv('SERVICE_NAME')
- os.unsetenv('CONSUL_HOST')
- os.unsetenv('HOSTNAME')
+ os.unsetenv("pytest")
+ os.unsetenv("SERVICE_NAME")
+ os.unsetenv("CONSUL_HOST")
+ os.unsetenv("HOSTNAME")
return result
+
def verify_dbmonitoring():
- os.environ['pytest']='test'
- os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static'
- os.environ['CONSUL_HOST']='localhost'
- os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static'
+ os.environ["pytest"] = "test"
+ os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static"
+ os.environ["CONSUL_HOST"] = "localhost"
+ os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static"
try:
jsfile = db.fetch_json_file()
ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties()
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
- dbmon.db_monitoring(hbc_pid,jsfile,user_name,password,ip_address,port_num,db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
+ dbmon.db_monitoring(hbc_pid, jsfile, user_name, password, ip_address, port_num, db_name)
result = True
except Exception as e:
print("Message process error - %s" % e)
result = False
print(result)
- os.unsetenv('pytest')
- os.unsetenv('SERVICE_NAME')
- os.unsetenv('CONSUL_HOST')
- os.unsetenv('HOSTNAME')
+ os.unsetenv("pytest")
+ os.unsetenv("SERVICE_NAME")
+ os.unsetenv("CONSUL_HOST")
+ os.unsetenv("HOSTNAME")
return result
+
def verify_dbmon_startup():
try:
- p = subprocess.Popen(['./miss_htbt_service/db_monitoring.py'], stdout=subprocess.PIPE,shell=True)
+ p = subprocess.Popen(["./miss_htbt_service/db_monitoring.py"], stdout=subprocess.PIPE, shell=True)
time.sleep(1)
except Exception as e:
return None
return True
+
def verify_sendControlLoop_VNF_ONSET():
try:
pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/"
- _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName")
+ _CL_return = dbmon.sendControlLoopEvent(
+ "ONSET",
+ pol_url,
+ "1.0",
+ "vFireWall",
+ "pscope",
+ "VNF",
+ "srcname1",
+ 1541234567,
+ "SampleCLName",
+ "1.0",
+ "genVnfName",
+ )
except Exception as e:
return None
return _CL_return
+
def verify_sendControlLoop_VM_ONSET():
try:
pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/"
- _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName")
+ _CL_return = dbmon.sendControlLoopEvent(
+ "ONSET",
+ pol_url,
+ "1.0",
+ "vFireWall",
+ "pscope",
+ "VM",
+ "srcname1",
+ 1541234567,
+ "SampleCLName",
+ "1.0",
+ "genVnfName",
+ )
except Exception as e:
return None
return _CL_return
+
def verify_sendControlLoop_VNF_ABATED():
try:
pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/"
- _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName")
+ _CL_return = dbmon.sendControlLoopEvent(
+ "ABATED",
+ pol_url,
+ "1.0",
+ "vFireWall",
+ "pscope",
+ "VNF",
+ "srcname1",
+ 1541234567,
+ "SampleCLName",
+ "1.0",
+ "genVnfName",
+ )
except Exception as e:
return None
return _CL_return
+
def verify_sendControlLoop_VM_ABATED():
try:
pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/"
- _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName")
+ _CL_return = dbmon.sendControlLoopEvent(
+ "ABATED",
+ pol_url,
+ "1.0",
+ "vFireWall",
+ "pscope",
+ "VM",
+ "srcname1",
+ 1541234567,
+ "SampleCLName",
+ "1.0",
+ "genVnfName",
+ )
except Exception as e:
return None
return _CL_return