aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHansen, Tony (th1395) <th1395@att.com>2021-12-01 22:01:56 +0000
committerHansen, Tony (th1395) <th1395@att.com>2021-12-02 19:58:31 +0000
commit2108563705a2ec8bb80029d36122c69fa4d06df5 (patch)
tree1453b42bc3535635b136d0963a290243e26ae35f
parent8d7c0201456b7f9af6e91fea90354f4c3de323fe (diff)
run the black formatting tool on python code
also fix up some copyright & license block lines Change-Id: Ifb628e2ef1e5f13fed0a29964eec387d3982d605 Signed-off-by: Hansen, Tony (th1395) <th1395@att.com> Issue-ID: DCAEGEN2-2995 Signed-off-by: Hansen, Tony (th1395) <th1395@att.com>
-rw-r--r--Changelog.md1
-rw-r--r--miss_htbt_service/__init__.py5
-rw-r--r--miss_htbt_service/cbs_polling.py24
-rw-r--r--miss_htbt_service/check_health.py47
-rw-r--r--miss_htbt_service/db_monitoring.py243
-rw-r--r--miss_htbt_service/get_logger.py10
-rw-r--r--miss_htbt_service/htbtworker.py114
-rw-r--r--miss_htbt_service/misshtbtd.py257
-rw-r--r--miss_htbt_service/mod/trapd_exit.py11
-rw-r--r--miss_htbt_service/mod/trapd_get_cbs_config.py32
-rw-r--r--miss_htbt_service/mod/trapd_http_session.py6
-rw-r--r--miss_htbt_service/mod/trapd_io.py23
-rw-r--r--miss_htbt_service/mod/trapd_runtime_pid.py10
-rw-r--r--miss_htbt_service/mod/trapd_settings.py6
-rw-r--r--miss_htbt_service/mod/trapd_vnf_table.py193
-rw-r--r--setup.py20
-rw-r--r--tests/monkey_psycopg2.py27
-rw-r--r--tests/test_binding.py85
-rw-r--r--tests/test_check_health.py34
-rw-r--r--tests/test_get_logger.py12
-rw-r--r--tests/test_htbtworker.py15
-rw-r--r--tests/test_trapd_exit.py15
-rw-r--r--tests/test_trapd_get_cbs_config.py13
-rw-r--r--tests/test_trapd_http_session.py5
-rw-r--r--tests/test_trapd_runtime_pid.py16
-rw-r--r--tests/test_trapd_settings.py9
-rw-r--r--tests/test_trapd_vnf_table.py51
27 files changed, 788 insertions, 496 deletions
diff --git a/Changelog.md b/Changelog.md
index 5aeff31..2ecfd88 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [2.4.0] - 2021/10/12
### Changed
- [DCAEGEN2-2939] Removed unused code (config\_notif.py)
+- [DCAEGEN2-2995] run the black formatting tool on python code
### Fixed
- [DCAEGEN2-2832] Pod become unready state
- [DCAEGEN2-2872] No such file or directory error and stop working
diff --git a/miss_htbt_service/__init__.py b/miss_htbt_service/__init__.py
index 1ae08ca..a56d536 100644
--- a/miss_htbt_service/__init__.py
+++ b/miss_htbt_service/__init__.py
@@ -1,5 +1,5 @@
-# ================================================================================
-# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
+# ============LICENSE_START=======================================================
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,4 +16,3 @@
# empty __init__.py so that pytest can add correct path to coverage report, -- per pytest
# best practice guideline
-
diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py
index e7bdef1..53585ba 100644
--- a/miss_htbt_service/cbs_polling.py
+++ b/miss_htbt_service/cbs_polling.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -35,7 +35,15 @@ _logger = logging.getLogger(__name__)
def poll_cbs(current_pid: int) -> None:
jsfile = db.fetch_json_file()
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(jsfile)
hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
msg = "CBSP:Main process ID in hb_common is %d", hbc_pid
_logger.info(msg)
@@ -43,13 +51,13 @@ def poll_cbs(current_pid: int) -> None:
_logger.info(msg)
msg = "CBSP:CBS Polling interval is %d", cbs_polling_interval
_logger.info(msg)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
cbs_polling_interval = "30"
time.sleep(int(cbs_polling_interval))
hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
if current_pid == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING":
_logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION")
state = "RECONFIGURATION"
@@ -60,7 +68,7 @@ def poll_cbs(current_pid: int) -> None:
def cbs_polling_loop(current_pid: int):
- get_logger.configure_logger('cbs_polling')
+ get_logger.configure_logger("cbs_polling")
while True:
poll_cbs(current_pid)
diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py
index 4d04210..71f7c58 100644
--- a/miss_htbt_service/check_health.py
+++ b/miss_htbt_service/check_health.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,38 +22,39 @@ from urllib import parse
class GetHandler(BaseHTTPRequestHandler):
-
def do_GET(self):
parsed_path = parse.urlparse(self.path)
- message = '\n'.join([
- 'CLIENT VALUES:',
- 'client_address=%s (%s)' % (self.client_address, self.address_string()),
- 'command=%s' % self.command,
- 'path=%s' % self.path,
- 'real path=%s' % parsed_path.path,
- 'query=%s' % parsed_path.query,
- 'request_version=%s' % self.request_version,
- '',
- 'SERVER VALUES:',
- 'server_version=%s' % self.server_version,
- 'sys_version=%s' % self.sys_version,
- 'protocol_version=%s' % self.protocol_version,
- '',
- ])
+ message = "\n".join(
+ [
+ "CLIENT VALUES:",
+ "client_address=%s (%s)" % (self.client_address, self.address_string()),
+ "command=%s" % self.command,
+ "path=%s" % self.path,
+ "real path=%s" % parsed_path.path,
+ "query=%s" % parsed_path.query,
+ "request_version=%s" % self.request_version,
+ "",
+ "SERVER VALUES:",
+ "server_version=%s" % self.server_version,
+ "sys_version=%s" % self.sys_version,
+ "protocol_version=%s" % self.protocol_version,
+ "",
+ ]
+ )
self.send_response(200)
self.end_headers()
- self.wfile.write(bytes(message, 'utf-8'))
+ self.wfile.write(bytes(message, "utf-8"))
return
def do_POST(self):
- content_len = int(self.headers.get('content-length', 0))
+ content_len = int(self.headers.get("content-length", 0))
post_body = self.rfile.read(content_len)
self.send_response(200)
self.end_headers()
data = json.loads(post_body)
- self.wfile.write(bytes(data['health'], 'utf-8'))
+ self.wfile.write(bytes(data["health"], "utf-8"))
return
@@ -61,9 +62,9 @@ def start_health_check_server() -> None:
from http.server import HTTPServer
server = HTTPServer(("", 10002), GetHandler)
- print('Starting server at http://localhost:10002')
+ print("Starting server at http://localhost:10002")
server.serve_forever()
-if __name__ == '__main__':
+if __name__ == "__main__":
start_health_check_server()
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
index a405876..32a2d5c 100644
--- a/miss_htbt_service/db_monitoring.py
+++ b/miss_htbt_service/db_monitoring.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -37,81 +37,100 @@ import get_logger
_logger = logging.getLogger(__name__)
-def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time,
- closed_control_loop_name, version, target):
+def sendControlLoopEvent(
+ CLType,
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+):
msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type
_logger.info(msg)
if CLType == "ONSET":
_logger.info("DBM:Heartbeat not received, raising alarm event")
if target_type == "VNF":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"generic-vnf.vnf-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"generic-vnf.vnf-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
elif target_type == "VM":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"vserver.vserver-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"vserver.vserver-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
else:
return True
elif CLType == "ABATED":
_logger.info("DBM:Heartbeat received, clearing alarm event")
# last_date_time = datetime.datetime.now()
if target_type == "VNF":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"generic-vnf.vnf-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ABATED",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"generic-vnf.vnf-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ABATED",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
elif target_type == "VM":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"vserver.vserver-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ABATED",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"vserver.vserver-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ABATED",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
else:
return True
else:
@@ -131,7 +150,7 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_sc
msg = "DBM:Status code for sending the control loop event is", ret
_logger.info(msg)
except Exception as err:
- msg = 'Message send failure : ', err
+ msg = "Message send failure : ", err
_logger.error(msg)
return True
@@ -140,22 +159,24 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
while True:
time.sleep(20)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
break
try:
- with open(json_file, 'r') as outfile:
+ with open(json_file, "r") as outfile:
cfg = json.load(outfile)
- pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
+ pol_url = str(cfg["streams_publishes"]["dcae_cl_out"]["dmaap_info"]["topic_url"])
except Exception as err:
- msg = 'Json file process error : ', err
+ msg = "Json file process error : ", err
_logger.error(msg)
continue
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
if int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING":
@@ -170,9 +191,12 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
_logger.info("DBM:Waiting for hb_common state to become RUNNING")
break
- cur.execute("SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, "
- "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, "
- "target, version FROM vnf_table_1 WHERE event_name = %s", (event_name,))
+ cur.execute(
+ "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, "
+ "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, "
+ "target, version FROM vnf_table_1 WHERE event_name = %s",
+ (event_name,),
+ )
rows = cur.fetchall()
validity_flag = rows[0][0]
source_name_count = rows[0][1]
@@ -189,8 +213,11 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
if validity_flag == 1:
for source_name_key in range(source_name_count):
epoc_time = int(round(time.time() * 1000))
- cur.execute("SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE "
- "event_name = %s AND source_name_key = %s", (event_name, (source_name_key + 1)))
+ cur.execute(
+ "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE "
+ "event_name = %s AND source_name_key = %s",
+ (event_name, (source_name_key + 1)),
+ )
row = cur.fetchall()
if len(row) == 0:
continue
@@ -198,20 +225,44 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
srcName = row[0][1]
cl_flag = row[0][2]
if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0:
- sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope,
- target_type, srcName, epoc_time, closed_control_loop_name, version,
- target)
+ sendControlLoopEvent(
+ "ONSET",
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+ )
cl_flag = 1
- cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND "
- "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
+ (cl_flag, event_name, (source_name_key + 1)),
+ )
connection_db.commit()
elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1:
- sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope,
- target_type, srcName, epoc_time, closed_control_loop_name, version,
- target)
+ sendControlLoopEvent(
+ "ABATED",
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+ )
cl_flag = 0
- cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND "
- "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
+ (cl_flag, event_name, (source_name_key + 1)),
+ )
connection_db.commit()
else: # pragma: no cover
@@ -233,15 +284,23 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
if __name__ == "__main__":
- get_logger.configure_logger('db_monitoring')
+ get_logger.configure_logger("db_monitoring")
_logger.info("DBM: DBM Process started")
current_pid = sys.argv[1]
jsfile = sys.argv[2]
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(jsfile)
msg = "DBM:Parent process ID and json file name", current_pid, jsfile
_logger.info(msg)
while True:
db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
break
diff --git a/miss_htbt_service/get_logger.py b/miss_htbt_service/get_logger.py
index 55286eb..8068f6b 100644
--- a/miss_htbt_service/get_logger.py
+++ b/miss_htbt_service/get_logger.py
@@ -1,6 +1,6 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
import logging.handlers
LOG_LEVEL = logging.DEBUG
-LOG_FORMAT = '%(asctime)s | %(levelname)5s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(message)s'
+LOG_FORMAT = "%(asctime)s | %(levelname)5s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(message)s"
LOG_MAXSIZE = 10485760 * 5
LOG_BACKUP_COUNT = 10
@@ -39,9 +39,9 @@ def configure_logger(proc_name: str) -> None:
# Add rotating log file handler
if proc_name:
- logfile_path = './hb_%s_logs.txt' % proc_name
+ logfile_path = "./hb_%s_logs.txt" % proc_name
else:
- logfile_path = './hb_logs.txt'
+ logfile_path = "./hb_logs.txt"
fhandler = logging.handlers.RotatingFileHandler(logfile_path, maxBytes=LOG_MAXSIZE, backupCount=LOG_BACKUP_COUNT)
fhandler.setFormatter(formatter)
root.addHandler(fhandler)
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index be1b6aa..44436a2 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -45,7 +45,7 @@ def read_json_file(i, prefix="../../tests"):
with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile:
cfg = json.load(outfile)
elif i == 2:
- with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile:
+ with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile:
cfg = json.load(outfile)
return cfg
@@ -56,19 +56,21 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
sleep_duration = 20
while True:
time.sleep(sleep_duration)
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
- mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url'])
+ mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"])
while True:
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
if hbc_state == "RECONFIGURATION":
_logger.info("HBT:Waiting for hb_common state to become RUNNING")
time.sleep(10)
else:
break
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
connection_db = 0
else:
@@ -79,13 +81,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
msg = "\n\nHBT:eventnameList values ", eventnameList
_logger.info(msg)
if "groupID" not in os.environ or "consumerID" not in os.environ:
- get_url = mr_url + '/DefaultGroup/1?timeout=15000'
+ get_url = mr_url + "/DefaultGroup/1?timeout=15000"
else:
- get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000'
+ get_url = mr_url + "/" + os.getenv("groupID", "") + "/" + os.getenv("consumerID", "") + "?timeout=15000"
msg = "HBT:Getting :" + get_url
_logger.info(msg)
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
jsonobj = read_json_file(i)
jobj = []
jobj.append(jsonobj)
@@ -103,14 +105,14 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
# If mrstatus in message body indicates some information, not json msg.
if "mrstatus" in inputString:
continue
- jlist = inputString.split('\n')
+ jlist = inputString.split("\n")
# Process the DMaaP input message retreived
error = False
for line in jlist:
try:
jobj = json.loads(line)
except ValueError:
- msg = 'HBT:Decoding JSON has failed'
+ msg = "HBT:Decoding JSON has failed"
_logger.error(msg)
error = True
break
@@ -120,17 +122,17 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
continue
for item in jobj:
try:
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
jitem = jsonobj
else:
jitem = json.loads(item)
- srcname = (jitem['event']['commonEventHeader']['sourceName'])
- lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec'])
+ srcname = jitem["event"]["commonEventHeader"]["sourceName"]
+ lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
# if lastEpochMicrosec looks like microsec, align it with millisec
if lastepo > 1000000000000000:
lastepo = int(lastepo / 1000)
- seqnum = (jitem['event']['commonEventHeader']['sequence'])
- eventName = (jitem['event']['commonEventHeader']['eventName'])
+ seqnum = jitem["event"]["commonEventHeader"]["sequence"]
+ eventName = jitem["event"]["commonEventHeader"]["eventName"]
except Exception as err:
msg = "HBT message process error - ", err
_logger.error(msg)
@@ -140,7 +142,8 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if db_table_creation_check(connection_db, "vnf_table_2") is False:
msg = "HBT:Creating vnf_table_2"
_logger.info(msg)
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE vnf_table_2 (
EVENT_NAME varchar,
SOURCE_NAME_KEY integer,
@@ -148,12 +151,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
LAST_EPO_TIME BIGINT,
SOURCE_NAME varchar,
CL_FLAG integer
- )""")
+ )"""
+ )
else:
msg = "HBT:vnf_table_2 is already there"
_logger.info(msg)
if eventName in eventnameList: # pragma: no cover
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
break
cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (eventName,))
row = cur.fetchone()
@@ -162,16 +166,22 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
cl_flag = 0
if source_name_count == 0: # pragma: no cover
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (eventName, source_name_key, lastepo, srcname, cl_flag))
- cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s",
- (source_name_key, eventName))
+ cur.execute(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
+ (eventName, source_name_key, lastepo, srcname, cl_flag),
+ )
+ cur.execute(
+ "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s",
+ (source_name_key, eventName),
+ )
else: # pragma: no cover
msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count
_logger.info(msg)
for source_name_key in range(source_name_count):
- cur.execute("SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND "
- "source_name_key = %s", (eventName, (source_name_key + 1)))
+ cur.execute(
+ "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s",
+ (eventName, (source_name_key + 1)),
+ )
row = cur.fetchall()
if len(row) == 0:
continue
@@ -179,9 +189,11 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if db_srcname == srcname:
msg = "HBT: Update vnf_table_2 : ", source_name_key, row
_logger.info(msg)
- cur.execute("UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
- "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
- (lastepo, srcname, eventName, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
+ "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
+ (lastepo, srcname, eventName, (source_name_key + 1)),
+ )
source_name_key = source_name_count
break
else:
@@ -191,27 +203,31 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if source_name_count == (source_name_key + 1):
source_name_key = source_name_count + 1
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (eventName, source_name_key, lastepo, srcname, cl_flag))
- cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
- (source_name_key, eventName))
+ cur.execute(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
+ (eventName, source_name_key, lastepo, srcname, cl_flag),
+ )
+ cur.execute(
+ "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
+ (source_name_key, eventName),
+ )
else:
_logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
commit_db(connection_db)
commit_and_close_db(connection_db)
- if os.getenv('pytest', "") != 'test':
+ if os.getenv("pytest", "") != "test":
cur.close()
def postgres_db_open(username, password, host, port, database_name):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port)
return connection
def db_table_creation_check(connection_db, table_name):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
cur = connection_db.cursor()
@@ -223,43 +239,51 @@ def db_table_creation_check(connection_db, table_name):
else:
return False
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
finally:
cur.close()
def commit_db(connection_db):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
connection_db.commit() # <--- makes sure the change is shown in the database
return True
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
def commit_and_close_db(connection_db):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
connection_db.commit() # <--- makes sure the change is shown in the database
connection_db.close()
return True
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
-if __name__ == '__main__':
- get_logger.configure_logger('htbtworker')
+if __name__ == "__main__":
+ get_logger.configure_logger("htbtworker")
jsfile = sys.argv[1]
msg = "HBT:HeartBeat thread Created"
_logger.info("HBT:HeartBeat thread Created")
msg = "HBT:The config file name passed is -%s", jsfile
_logger.info(msg)
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(jsfile)
process_msg(jsfile, user_name, password, ip_address, port_num, db_name)
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py
index 6be2260..5ba0860 100644
--- a/miss_htbt_service/misshtbtd.py
+++ b/miss_htbt_service/misshtbtd.py
@@ -2,9 +2,9 @@
# ============LICENSE_START=======================================================
# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Samsung Electronics. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Samsung Electronics. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@ CONFIG_PATH = "../etc/config.json"
def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
from psycopg2 import connect
+
try:
con = connect(user=user_name, host=ip_address, password=password)
database_name = db_name
@@ -82,8 +83,8 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password
def read_hb_common(user_name, password, ip_address, port_num, db_name):
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
hbc_pid = 10
hbc_srcName = "srvc_name"
hbc_time = 1584595881
@@ -105,41 +106,47 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name):
def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name):
current_time = int(round(time.time()))
source_name = socket.gethostname()
- source_name = source_name + "-" + os.getenv('SERVICE_NAME', "")
- envPytest = os.getenv('pytest', "")
- if envPytest != 'test':
+ source_name = source_name + "-" + os.getenv("SERVICE_NAME", "")
+ envPytest = os.getenv("pytest", "")
+ if envPytest != "test":
connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
if heartbeat.db_table_creation_check(connection_db, "hb_common") is False:
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE hb_common (
PROCESS_ID integer primary key,
SOURCE_NAME varchar,
LAST_ACCESSED_TIME integer,
CURRENT_STATE varchar
- )""")
+ )"""
+ )
cur.execute("INSERT INTO hb_common VALUES(%s, %s, %s, %s)", (process_id, source_name, current_time, state))
_logger.info("MSHBT:Created hb_common DB and updated new values")
elif update_flg == 1:
- cur.execute("UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s "
- "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", (current_time, state, process_id, source_name))
+ cur.execute(
+ "UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s "
+ "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s",
+ (current_time, state, process_id, source_name),
+ )
_logger.info("MSHBT:Updated hb_common DB with new values")
heartbeat.commit_and_close_db(connection_db)
cur.close()
def create_update_vnf_table_1(jsfile, update_db, connection_db):
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
- hbcfg = cfg['heartbeat_config']
+ hbcfg = cfg["heartbeat_config"]
jhbcfg = json.loads(hbcfg)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
else:
cur = connection_db.cursor()
if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False:
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE vnf_table_1 (
EVENT_NAME varchar primary key,
HEARTBEAT_MISSED_COUNT integer,
@@ -153,7 +160,8 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db):
VERSION varchar,
SOURCE_NAME_COUNT integer,
VALIDITY_FLAG integer
- )""")
+ )"""
+ )
_logger.info("MSHBT:Created vnf_table_1 table")
if update_db == 1:
cur.execute("UPDATE vnf_table_1 SET VALIDITY_FLAG=0 WHERE VALIDITY_FLAG=1")
@@ -161,35 +169,62 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db):
# Put some initial values into the queue
cur.execute("SELECT event_name FROM vnf_table_1")
vnf_list = [item[0] for item in cur.fetchall()]
- for vnf in (jhbcfg['vnfs']):
- nfc = vnf['eventName']
+ for vnf in jhbcfg["vnfs"]:
+ nfc = vnf["eventName"]
validity_flag = 1
source_name_count = 0
- missed = vnf['heartbeatcountmissed']
- intvl = vnf['heartbeatinterval']
- clloop = vnf['closedLoopControlName']
- policyVersion = vnf['policyVersion']
- policyName = vnf['policyName']
- policyScope = vnf['policyScope']
- target_type = vnf['target_type']
- target = vnf['target']
- version = vnf['version']
-
- if envPytest == 'test':
+ missed = vnf["heartbeatcountmissed"]
+ intvl = vnf["heartbeatinterval"]
+ clloop = vnf["closedLoopControlName"]
+ policyVersion = vnf["policyVersion"]
+ policyName = vnf["policyName"]
+ policyScope = vnf["policyScope"]
+ target_type = vnf["target_type"]
+ target = vnf["target"]
+ version = vnf["version"]
+
+ if envPytest == "test":
# skip executing SQL in test
continue
if nfc not in vnf_list:
- cur.execute("INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
- (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target,
- version, source_name_count, validity_flag))
+ cur.execute(
+ "INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
+ (
+ nfc,
+ missed,
+ intvl,
+ clloop,
+ policyVersion,
+ policyName,
+ policyScope,
+ target_type,
+ target,
+ version,
+ source_name_count,
+ validity_flag,
+ ),
+ )
_logger.debug("Inserted new event_name = %s into vnf_table_1", nfc)
else:
- cur.execute("""UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s,
+ cur.execute(
+ """UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s,
CLOSED_CONTROL_LOOP_NAME = %s, POLICY_VERSION = %s, POLICY_NAME = %s, POLICY_SCOPE = %s,
TARGET_TYPE = %s, TARGET = %s, VERSION = %s, VALIDITY_FLAG = %s where EVENT_NAME = %s""",
- (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version,
- validity_flag, nfc))
- if envPytest != 'test':
+ (
+ missed,
+ intvl,
+ clloop,
+ policyVersion,
+ policyName,
+ policyScope,
+ target_type,
+ target,
+ version,
+ validity_flag,
+ nfc,
+ ),
+ )
+ if envPytest != "test":
cur.close()
_logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file")
@@ -210,31 +245,36 @@ def db_monitoring_process(current_pid, jsfile):
def read_hb_properties_default():
# Read the hbproperties.yaml for postgress and CBS related data
- s = open(hb_properties_file, 'r')
+ s = open(hb_properties_file, "r")
a = yaml.full_load(s)
- if (os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None):
- ip_address = a['pg_ipAddress']
- port_num = a['pg_portNum']
- user_name = a['pg_userName']
- password = a['pg_passwd']
+ if (
+ (os.getenv("pg_ipAddress") is None)
+ or (os.getenv("pg_portNum") is None)
+ or (os.getenv("pg_userName") is None)
+ or (os.getenv("pg_passwd") is None)
+ ):
+ ip_address = a["pg_ipAddress"]
+ port_num = a["pg_portNum"]
+ user_name = a["pg_userName"]
+ password = a["pg_passwd"]
else:
- ip_address = os.getenv('pg_ipAddress')
- port_num = os.getenv('pg_portNum')
- user_name = os.getenv('pg_userName')
- password = os.getenv('pg_passwd')
+ ip_address = os.getenv("pg_ipAddress")
+ port_num = os.getenv("pg_portNum")
+ user_name = os.getenv("pg_userName")
+ password = os.getenv("pg_passwd")
- dbName = a['pg_dbName']
+ dbName = a["pg_dbName"]
db_name = dbName.lower()
- cbs_polling_required = a['CBS_polling_allowed']
- cbs_polling_interval = a['CBS_polling_interval']
+ cbs_polling_required = a["CBS_polling_allowed"]
+ cbs_polling_interval = a["CBS_polling_interval"]
s.close()
return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
def read_hb_properties(jsfile):
try:
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
except Exception as err:
msg = "CBS Json file load error - " + str(err)
@@ -242,20 +282,20 @@ def read_hb_properties(jsfile):
return read_hb_properties_default()
try:
- ip_address = str(cfg['pg_ipAddress'])
- port_num = str(cfg['pg_portNum'])
- user_name = str(cfg['pg_userName'])
- password = str(cfg['pg_passwd'])
- dbName = str(cfg['pg_dbName'])
+ ip_address = str(cfg["pg_ipAddress"])
+ port_num = str(cfg["pg_portNum"])
+ user_name = str(cfg["pg_userName"])
+ password = str(cfg["pg_passwd"])
+ dbName = str(cfg["pg_dbName"])
db_name = dbName.lower()
- cbs_polling_required = str(cfg['CBS_polling_allowed'])
- cbs_polling_interval = str(cfg['CBS_polling_interval'])
- consumer_id = str(cfg['consumerID'])
- group_id = str(cfg['groupID'])
- os.environ['consumerID'] = consumer_id
- os.environ['groupID'] = group_id
+ cbs_polling_required = str(cfg["CBS_polling_allowed"])
+ cbs_polling_interval = str(cfg["CBS_polling_interval"])
+ consumer_id = str(cfg["consumerID"])
+ group_id = str(cfg["groupID"])
+ os.environ["consumerID"] = consumer_id
+ os.environ["groupID"] = group_id
if "SERVICE_NAME" in cfg:
- os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME'])
+ os.environ["SERVICE_NAME"] = str(cfg["SERVICE_NAME"])
except Exception as err:
msg = "CBS Json file read parameter error - " + str(err)
_logger.error(msg)
@@ -273,7 +313,7 @@ def fetch_json_file() -> str:
# Try to get config from CBS. If succeeded, config json is stored to tds.c_config .
if get_cbs_config():
# Save config to temporary file
- with tempfile.NamedTemporaryFile('w', delete=False) as temp:
+ with tempfile.NamedTemporaryFile("w", delete=False) as temp:
_logger.info("MSHBD: New config saved to temp file %s", temp.name)
json.dump(tds.c_config, temp)
# Swap current config with downloaded config
@@ -289,8 +329,8 @@ def fetch_json_file() -> str:
def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name):
- envPytest = os.getenv('pytest', "")
- if envPytest != 'test': # pragma: no cover
+ envPytest = os.getenv("pytest", "")
+ if envPytest != "test": # pragma: no cover
if update_db == 0:
create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name
@@ -310,7 +350,13 @@ def create_process(job_list, jsfile, pid_current):
if len(job_list) == 0:
p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,))
time.sleep(1)
- p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,))
+ p2 = multiprocessing.Process(
+ target=db_monitoring_process,
+ args=(
+ pid_current,
+ jsfile,
+ ),
+ )
p1.start()
time.sleep(1)
p2.start()
@@ -322,7 +368,7 @@ def create_process(job_list, jsfile, pid_current):
def main():
- get_logger.configure_logger('misshtbtd')
+ get_logger.configure_logger("misshtbtd")
pid_current = os.getpid()
hc_proc = multiprocessing.Process(target=check_health.start_health_check_server)
cbs_polling_proc = multiprocessing.Process(target=cbs_polling.cbs_polling_loop, args=(pid_current,))
@@ -334,15 +380,32 @@ def main():
job_list = []
jsfile = fetch_json_file()
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile)
- msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = read_hb_properties(jsfile)
+ msg = (
+ "MSHBT:HB Properties -",
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ )
_logger.info(msg)
update_db = 0
create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
state = "RECONFIGURATION"
update_flg = 0
create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
- if cbs_polling_required == 'True':
+ if cbs_polling_required == "True":
# note: cbs_polling process must be started after `hb_common` table created
cbs_polling_proc.start()
_logger.info("MSHBD: Started CBS polling process. PID=%d", cbs_polling_proc.pid)
@@ -350,17 +413,27 @@ def main():
_logger.info("MSHBD:Now be in a continuous loop")
i = 0
while True:
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time
_logger.info(msg)
current_time = int(round(time.time()))
time_difference = current_time - hbc_time
- msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference
+ msg = (
+ "MSHBD:pid,srcName,state,time,ctime,timeDiff is",
+ hbc_pid,
+ hbc_srcName,
+ hbc_state,
+ hbc_time,
+ current_time,
+ time_difference,
+ )
_logger.info(msg)
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
if i == 2:
hbc_pid = pid_current
source_name = hbc_srcName
@@ -376,9 +449,13 @@ def main():
if hbc_state == "RUNNING":
state = "RUNNING"
update_flg = 1
- create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name
+ )
elif hbc_state == "RECONFIGURATION":
- _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes")
+ _logger.info(
+ "MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes"
+ )
jsfile = fetch_json_file()
update_db = 1
create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name)
@@ -387,7 +464,9 @@ def main():
job_list = create_process(job_list, jsfile, pid_current)
state = "RUNNING"
update_flg = 1
- create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name
+ )
else:
_logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping")
@@ -395,7 +474,11 @@ def main():
_logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE")
else:
jsfile = fetch_json_file()
- msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current
+ msg = (
+ "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s",
+ jsfile,
+ pid_current,
+ )
_logger.info(msg)
job_list = create_process(job_list, jsfile, pid_current)
else:
@@ -409,13 +492,17 @@ def main():
msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current
_logger.info(msg)
job_list = create_process(job_list, jsfile, pid_current)
- hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
update_flg = 1
- create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name)
+ create_update_hb_common(
+ update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name
+ )
else:
_logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR")
time.sleep(25)
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
i = i + 1
if i > 5:
_logger.info("Terminating main process for pytest")
@@ -451,5 +538,5 @@ def main():
cbs_polling_proc.join()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/miss_htbt_service/mod/trapd_exit.py b/miss_htbt_service/mod/trapd_exit.py
index c741fe5..aae12bb 100644
--- a/miss_htbt_service/mod/trapd_exit.py
+++ b/miss_htbt_service/mod/trapd_exit.py
@@ -1,9 +1,7 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,7 +21,7 @@ trapc_exit_snmptrapd is responsible for removing any existing runtime PID
file, and exiting with the provided (param 1) exit code
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
import sys
import os
@@ -62,6 +60,7 @@ def cleanup_and_exit(_loc_exit_code, _pid_file_name):
rc = rm_pid(_pid_file_name)
sys.exit(_loc_exit_code)
+
# # # # # # # # # # # # #
# fx: cleanup_and_exit
# - remove pid file
@@ -89,5 +88,3 @@ def cleanup(_loc_exit_code, _pid_file_name):
if _pid_file_name is not None:
rc = rm_pid(_pid_file_name)
-
-
diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/trapd_get_cbs_config.py
index 70dac53..f9fd4c3 100644
--- a/miss_htbt_service/mod/trapd_get_cbs_config.py
+++ b/miss_htbt_service/mod/trapd_get_cbs_config.py
@@ -1,10 +1,8 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Samsung Electronics. All rights reserved.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Samsung Electronics. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,7 +23,7 @@ env variable that specifies JSON equiv of CBS config (typically used for
testing purposes)
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
import json
import os
@@ -36,7 +34,7 @@ import traceback
import collections.abc
from onap_dcae_cbs_docker_client.client import get_config
from mod import trapd_settings as tds
-from mod.trapd_exit import cleanup,cleanup_and_exit
+from mod.trapd_exit import cleanup, cleanup_and_exit
from mod.trapd_io import stdout_logger
prog_name = os.path.basename(__file__)
@@ -75,46 +73,44 @@ def get_cbs_config():
_cbs_sim_json_file = os.getenv("CBS_HTBT_JSON", "None")
except Exception as e:
stdout_logger(msg)
- cleanup(1,None)
+ cleanup(1, None)
return False
msg = "CBS_HTBT_JSON not defined - FATAL ERROR, exiting"
if _cbs_sim_json_file == "None":
stdout_logger(msg)
- cleanup(1,None)
+ cleanup(1, None)
return False
else:
- msg = ("ONAP controller override specified via CBS_HTBT_JSON: %s" %
- _cbs_sim_json_file)
+ msg = "ONAP controller override specified via CBS_HTBT_JSON: %s" % _cbs_sim_json_file
stdout_logger(msg)
- msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + \
- " (invalid json?) - FATAL ERROR, exiting"
+ msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting"
try:
tds.c_config = json.load(open(_cbs_sim_json_file))
except Exception as e:
stdout_logger(msg)
- cleanup_and_exit(0,None)
+ cleanup_and_exit(0, None)
# recalc timeout, set default if not present
try:
- tds.timeout_seconds = tds.c_config['publisher.http_timeout_milliseconds'] / 1000.0
+ tds.timeout_seconds = tds.c_config["publisher.http_timeout_milliseconds"] / 1000.0
except Exception as e:
tds.timeout_seconds = 1.5
# recalc seconds_between_retries, set default if not present
try:
- tds.seconds_between_retries = tds.c_config['publisher.http_milliseconds_between_retries'] / 1000.0
+ tds.seconds_between_retries = tds.c_config["publisher.http_milliseconds_between_retries"] / 1000.0
except Exception as e:
- tds.seconds_between_retries = .750
+ tds.seconds_between_retries = 0.750
# recalc min_severity_to_log, set default if not present
try:
- tds.minimum_severity_to_log = tds.c_config['files.minimum_severity_to_log']
+ tds.minimum_severity_to_log = tds.c_config["files.minimum_severity_to_log"]
except Exception as e:
tds.minimum_severity_to_log = 3
try:
- tds.publisher_retries = tds.c_config['publisher.http_retries']
+ tds.publisher_retries = tds.c_config["publisher.http_retries"]
except Exception as e:
tds.publisher_retries = 3
diff --git a/miss_htbt_service/mod/trapd_http_session.py b/miss_htbt_service/mod/trapd_http_session.py
index 0d7f220..fc7e865 100644
--- a/miss_htbt_service/mod/trapd_http_session.py
+++ b/miss_htbt_service/mod/trapd_http_session.py
@@ -1,7 +1,5 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -21,7 +19,7 @@ trapd_http_session establishes an http session for future use in publishing
messages to the dmaap cluster.
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
import os
import requests
diff --git a/miss_htbt_service/mod/trapd_io.py b/miss_htbt_service/mod/trapd_io.py
index d60f060..3e03999 100644
--- a/miss_htbt_service/mod/trapd_io.py
+++ b/miss_htbt_service/mod/trapd_io.py
@@ -1,9 +1,7 @@
# ============LICENSE_START=======================================================)
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -21,7 +19,7 @@
"""
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
# basics
import datetime
@@ -36,6 +34,7 @@ import string
import time
import traceback
import unicodedata
+
# dcae_snmptrap
import mod.trapd_settings as tds
from mod.trapd_exit import cleanup_and_exit
@@ -48,7 +47,7 @@ prog_name = os.path.basename(__file__)
# # # # # # # # # # ## # # # # # # #
-#def roll_all_logs():
+# def roll_all_logs():
# """
# roll all active logs to timestamped version, open new one
# based on frequency defined in files.roll_frequency
@@ -107,7 +106,7 @@ prog_name = os.path.basename(__file__)
# # # # # # # # # # ## # # # # # # #
-#def open_eelf_logs():
+# def open_eelf_logs():
# """
# open various (multiple ???) logs
# """
@@ -159,7 +158,7 @@ prog_name = os.path.basename(__file__)
# # # # # # # # # # ## # # # # # # #
-#def roll_file(_loc_file_name):
+# def roll_file(_loc_file_name):
# """
# move active file to timestamped archive
# """
@@ -190,7 +189,7 @@ prog_name = os.path.basename(__file__)
## # # # # # # # # # # # #
#
#
-#def open_file(_loc_file_name):
+# def open_file(_loc_file_name):
# """
# open _loc_file_name, return file handle
# """
@@ -214,7 +213,7 @@ prog_name = os.path.basename(__file__)
# """
#
#
-#def close_file(_loc_fd, _loc_filename):
+# def close_file(_loc_fd, _loc_filename):
#
# try:
#
@@ -231,7 +230,7 @@ prog_name = os.path.basename(__file__)
## is released for python via LOG-161
## # # # # # # # # # ## # # # # # # #
#
-#def ecomp_logger(_log_type, _sev, _error_code, _msg):
+# def ecomp_logger(_log_type, _sev, _error_code, _msg):
# """
# Log to ecomp-style logfiles. Logs include:
#
@@ -389,4 +388,4 @@ def stdout_logger(_msg):
t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3]
- print('%s %s' % (t_out, _msg))
+ print("%s %s" % (t_out, _msg))
diff --git a/miss_htbt_service/mod/trapd_runtime_pid.py b/miss_htbt_service/mod/trapd_runtime_pid.py
index d2db83d..47612bd 100644
--- a/miss_htbt_service/mod/trapd_runtime_pid.py
+++ b/miss_htbt_service/mod/trapd_runtime_pid.py
@@ -1,7 +1,5 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -21,7 +19,7 @@ trapd_runtime_pid maintains a 'PID file' (file that contains the
PID of currently running trap receiver)
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
import logging
import os
@@ -50,8 +48,8 @@ def save_pid(_pid_file_name):
"""
try:
- pid_fd = open(_pid_file_name, 'w')
- pid_fd.write('%d' % os.getpid())
+ pid_fd = open(_pid_file_name, "w")
+ pid_fd.write("%d" % os.getpid())
pid_fd.close()
except IOError:
print("IOError saving PID file %s :" % _pid_file_name)
diff --git a/miss_htbt_service/mod/trapd_settings.py b/miss_htbt_service/mod/trapd_settings.py
index 0c8457f..3e7f141 100644
--- a/miss_htbt_service/mod/trapd_settings.py
+++ b/miss_htbt_service/mod/trapd_settings.py
@@ -1,7 +1,5 @@
# ============LICENSE_START=======================================================)
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,7 +17,7 @@
"""
"""
-__docformat__ = 'restructuredtext'
+__docformat__ = "restructuredtext"
def init():
diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/trapd_vnf_table.py
index db6bd17..1b00336 100644
--- a/miss_htbt_service/mod/trapd_vnf_table.py
+++ b/miss_htbt_service/mod/trapd_vnf_table.py
@@ -1,11 +1,9 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Samsung Electronics. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Samsung Electronics. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -44,45 +42,47 @@ _logger = logging.getLogger(__name__)
def hb_properties():
- #Read the hbproperties.yaml for postgress and CBS related data
- s=open(hb_properties_file, 'r')
- a=yaml.full_load(s)
- ip_address = a['pg_ipAddress']
- port_num = a['pg_portNum']
- user_name = a['pg_userName']
- password = a['pg_passwd']
- dbName = a['pg_dbName']
- db_name = dbName.lower()
- cbs_polling_required = a['CBS_polling_allowed']
- cbs_polling_interval = a['CBS_polling_interval']
- s.close()
- return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
-
-
-def verify_DB_creation_1(user_name,password,ip_address,port_num,db_name):
- connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+ # Read the hbproperties.yaml for postgress and CBS related data
+ s = open(hb_properties_file, "r")
+ a = yaml.full_load(s)
+ ip_address = a["pg_ipAddress"]
+ port_num = a["pg_portNum"]
+ user_name = a["pg_userName"]
+ password = a["pg_passwd"]
+ dbName = a["pg_dbName"]
+ db_name = dbName.lower()
+ cbs_polling_required = a["CBS_polling_allowed"]
+ cbs_polling_interval = a["CBS_polling_interval"]
+ s.close()
+ return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
+
+
+def verify_DB_creation_1(user_name, password, ip_address, port_num, db_name):
+ connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
try:
- _db_status=pm.db_table_creation_check(connection_db,"vnf_table_1")
+ _db_status = pm.db_table_creation_check(connection_db, "vnf_table_1")
except Exception as e:
return None
return _db_status
-def verify_DB_creation_2(user_name,password,ip_address,port_num,db_name):
- connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+def verify_DB_creation_2(user_name, password, ip_address, port_num, db_name):
+
+ connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
try:
- _db_status=pm.db_table_creation_check(connection_db,"vnf_table_2")
+ _db_status = pm.db_table_creation_check(connection_db, "vnf_table_2")
except Exception as e:
return None
return _db_status
-def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name):
- connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name)
+def verify_DB_creation_hb_common(user_name, password, ip_address, port_num, db_name):
+
+ connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
try:
- _db_status=pm.db_table_creation_check(connection_db,"hb_common")
+ _db_status = pm.db_table_creation_check(connection_db, "hb_common")
except Exception as e:
return None
@@ -90,35 +90,36 @@ def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name)
def verify_cbspolling():
- os.environ['pytest']='test'
- os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static'
+ os.environ["pytest"] = "test"
+ os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static"
cbs.poll_cbs(10)
- os.unsetenv('pytest')
- os.unsetenv('SERVICE_NAME')
+ os.unsetenv("pytest")
+ os.unsetenv("SERVICE_NAME")
def verify_fetch_json_file():
- os.environ['pytest']='test'
- os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static'
- os.environ['CONSUL_HOST']='localhost'
- os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static'
+ os.environ["pytest"] = "test"
+ os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static"
+ os.environ["CONSUL_HOST"] = "localhost"
+ os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static"
try:
- db.fetch_json_file()
- result = True
+ db.fetch_json_file()
+ result = True
except Exception as e:
- result = False
+ result = False
print(result)
- os.unsetenv('pytest')
- os.unsetenv('SERVICE_NAME')
- os.unsetenv('CONSUL_HOST')
- os.unsetenv('HOSTNAME')
+ os.unsetenv("pytest")
+ os.unsetenv("SERVICE_NAME")
+ os.unsetenv("CONSUL_HOST")
+ os.unsetenv("HOSTNAME")
return result
+
def verify_misshtbtdmain():
- os.environ['pytest']='test'
- os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static'
- os.environ['CONSUL_HOST']='localhost'
- os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static'
+ os.environ["pytest"] = "test"
+ os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static"
+ os.environ["CONSUL_HOST"] = "localhost"
+ os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static"
try:
db.main()
@@ -126,69 +127,125 @@ def verify_misshtbtdmain():
except Exception as e:
result = False
print(result)
- os.unsetenv('pytest')
- os.unsetenv('SERVICE_NAME')
- os.unsetenv('CONSUL_HOST')
- os.unsetenv('HOSTNAME')
+ os.unsetenv("pytest")
+ os.unsetenv("SERVICE_NAME")
+ os.unsetenv("CONSUL_HOST")
+ os.unsetenv("HOSTNAME")
return result
+
def verify_dbmonitoring():
- os.environ['pytest']='test'
- os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static'
- os.environ['CONSUL_HOST']='localhost'
- os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static'
+ os.environ["pytest"] = "test"
+ os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static"
+ os.environ["CONSUL_HOST"] = "localhost"
+ os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static"
try:
jsfile = db.fetch_json_file()
ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties()
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name)
- dbmon.db_monitoring(hbc_pid,jsfile,user_name,password,ip_address,port_num,db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
+ dbmon.db_monitoring(hbc_pid, jsfile, user_name, password, ip_address, port_num, db_name)
result = True
except Exception as e:
print("Message process error - %s" % e)
result = False
print(result)
- os.unsetenv('pytest')
- os.unsetenv('SERVICE_NAME')
- os.unsetenv('CONSUL_HOST')
- os.unsetenv('HOSTNAME')
+ os.unsetenv("pytest")
+ os.unsetenv("SERVICE_NAME")
+ os.unsetenv("CONSUL_HOST")
+ os.unsetenv("HOSTNAME")
return result
+
def verify_dbmon_startup():
try:
- p = subprocess.Popen(['./miss_htbt_service/db_monitoring.py'], stdout=subprocess.PIPE,shell=True)
+ p = subprocess.Popen(["./miss_htbt_service/db_monitoring.py"], stdout=subprocess.PIPE, shell=True)
time.sleep(1)
except Exception as e:
return None
return True
+
def verify_sendControlLoop_VNF_ONSET():
try:
pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/"
- _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName")
+ _CL_return = dbmon.sendControlLoopEvent(
+ "ONSET",
+ pol_url,
+ "1.0",
+ "vFireWall",
+ "pscope",
+ "VNF",
+ "srcname1",
+ 1541234567,
+ "SampleCLName",
+ "1.0",
+ "genVnfName",
+ )
except Exception as e:
return None
return _CL_return
+
def verify_sendControlLoop_VM_ONSET():
try:
pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/"
- _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName")
+ _CL_return = dbmon.sendControlLoopEvent(
+ "ONSET",
+ pol_url,
+ "1.0",
+ "vFireWall",
+ "pscope",
+ "VM",
+ "srcname1",
+ 1541234567,
+ "SampleCLName",
+ "1.0",
+ "genVnfName",
+ )
except Exception as e:
return None
return _CL_return
+
def verify_sendControlLoop_VNF_ABATED():
try:
pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/"
- _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName")
+ _CL_return = dbmon.sendControlLoopEvent(
+ "ABATED",
+ pol_url,
+ "1.0",
+ "vFireWall",
+ "pscope",
+ "VNF",
+ "srcname1",
+ 1541234567,
+ "SampleCLName",
+ "1.0",
+ "genVnfName",
+ )
except Exception as e:
return None
return _CL_return
+
def verify_sendControlLoop_VM_ABATED():
try:
pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/"
- _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName")
+ _CL_return = dbmon.sendControlLoopEvent(
+ "ABATED",
+ pol_url,
+ "1.0",
+ "vFireWall",
+ "pscope",
+ "VM",
+ "srcname1",
+ 1541234567,
+ "SampleCLName",
+ "1.0",
+ "genVnfName",
+ )
except Exception as e:
return None
return _CL_return
diff --git a/setup.py b/setup.py
index 22af9ab..297d168 100644
--- a/setup.py
+++ b/setup.py
@@ -33,10 +33,10 @@ from setuptools import setup, find_packages
## from pip.download import PipSession
setup(
- name='miss_htbt_service',
- description='Missing heartbeat microservice to communicate with policy-engine',
- version='2.4.0',
- #packages=find_packages(exclude=["tests.*", "tests"]),
+ name="miss_htbt_service",
+ description="Missing heartbeat microservice to communicate with policy-engine",
+ version="2.4.0",
+ # packages=find_packages(exclude=["tests.*", "tests"]),
packages=find_packages(),
install_requires=[
"requests==2.23.0",
@@ -47,12 +47,12 @@ setup(
"HTTPretty==1.0.5",
"pyOpenSSL==20.0.1",
"Wheel==0.36.2",
- "psycopg2-binary==2.8.6"
+ "psycopg2-binary==2.8.6",
],
- author = "Vijay Venkatesh Kumar",
- author_email = "vv770d@att.com",
- license = "",
- keywords = "missing heartbeat microservice",
- url = "https://gerrit.onap.org/r/#/admin/projects/dcaegen2/platform/heartbeat",
+ author="Vijay Venkatesh Kumar",
+ author_email="vv770d@att.com",
+ license="",
+ keywords="missing heartbeat microservice",
+ url="https://gerrit.onap.org/r/#/admin/projects/dcaegen2/platform/heartbeat",
zip_safe=False,
)
diff --git a/tests/monkey_psycopg2.py b/tests/monkey_psycopg2.py
index 05f44b4..e993874 100644
--- a/tests/monkey_psycopg2.py
+++ b/tests/monkey_psycopg2.py
@@ -1,6 +1,5 @@
# ============LICENSE_START====================================================
-# =============================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# =============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -34,14 +33,15 @@ FORCE_CLOSE_FAILURE = False
# This variable is used by monkey_psycopg2.monkey_set_defaults(multiDbInfo)
# to set up default values to be returned by cursors statements.
-DEFAULT_MULTI_DBINFO = { }
+DEFAULT_MULTI_DBINFO = {}
+
class MockConn(object):
"""
mock Connection interface returned by psycopg2.connect()
"""
- def __init__(self, dbInfo = None):
+ def __init__(self, dbInfo=None):
self.curCmd = None
self.curCursor = None
self.monkey_setDbInfo(dbInfo)
@@ -49,7 +49,7 @@ class MockConn(object):
def monkey_setDbInfo(self, dbInfo):
"""
Set up a set of defaults for the cursors on "select" statements.
- The outer scope is a string that will be matched against the currently-active
+ The outer scope is a string that will be matched against the currently-active
select statement being executed.
If there is a match, the specified values are returned by the cursor.
dbconn.monkey_setDbInfo({
@@ -71,7 +71,7 @@ class MockConn(object):
"""
if FORCE_COMMIT_FAILURE:
raise psycopg2.DatabaseError(f"Unable to commit: force_commit_failure=<{FORCE_COMMIT_FAILURE}>")
-
+
def close(self):
"""
mock close
@@ -110,7 +110,7 @@ class MockConn(object):
self.curCursor = None
return self
- def execute(self, cmd, args = None):
+ def execute(self, cmd, args=None):
"""
mock execute
@@ -155,6 +155,7 @@ class MockConn(object):
print(f"fetchall() returning {self.curCursor}")
return self.curCursor
+
def monkey_reset_forces(connect=False, cursor=False, execute=False, commit=False, close=False):
print(f"monkey_reset_forces({connect}, {cursor}, {execute}, {commit}, {close})")
global FORCE_CONNECT_FAILURE
@@ -168,11 +169,12 @@ def monkey_reset_forces(connect=False, cursor=False, execute=False, commit=False
global FORCE_CLOSE_FAILURE
FORCE_CLOSE_FAILURE = close
+
def monkey_set_defaults(multiDbInfo):
"""
Set up a set of defaults for the cursors on "select" statements.
The outer scope gives a database name.
- The next level is a string that will be matched against the currently-active
+ The next level is a string that will be matched against the currently-active
select statement being executed.
If both match, the specified values are returned by the cursor.
monkey_psycopg2.monkey_set_defaults({
@@ -187,10 +189,11 @@ def monkey_set_defaults(multiDbInfo):
global DEFAULT_MULTI_DBINFO
DEFAULT_MULTI_DBINFO = multiDbInfo
+
def monkey_connect(database=None, host=None, port=None, user=None, password=None):
"""
Mock psycopg2 connection.
- Returns a mock connection.
+ Returns a mock connection.
Will raise an exception if value of 'FORCE_CONNECT_FAILURE' is true.
(Used to force failure for certain code paths.)
@@ -199,7 +202,11 @@ def monkey_connect(database=None, host=None, port=None, user=None, password=None
(See monkey_set_defaults(), which must have been called prior to this being invoked.)
"""
if FORCE_CONNECT_FAILURE:
- raise Exception("Unable to connect to the database. password=<{}> force_connect_failure=<{}>".format(password, FORCE_CONNECT_FAILURE))
+ raise Exception(
+ "Unable to connect to the database. password=<{}> force_connect_failure=<{}>".format(
+ password, FORCE_CONNECT_FAILURE
+ )
+ )
if database in DEFAULT_MULTI_DBINFO:
return MockConn(DEFAULT_MULTI_DBINFO[database])
diff --git a/tests/test_binding.py b/tests/test_binding.py
index 6c8b0ef..5ed6532 100644
--- a/tests/test_binding.py
+++ b/tests/test_binding.py
@@ -1,8 +1,8 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2021 Samsung Electronics. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2021 Samsung Electronics. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,44 +24,85 @@ import httpretty
import subprocess
import json
-MODULE_EXTENSIONS = ('.py', '.pyc', '.pyo')
+MODULE_EXTENSIONS = (".py", ".pyc", ".pyo")
#####
# MONKEYPATCHES
#####
-mr_url = 'http://mrrouter.onap.org:3904'
-intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT'
-outopic = 'POLICY-HILOTCA-EVENT-OUTPUT'
+mr_url = "http://mrrouter.onap.org:3904"
+intopic = "VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT"
+outopic = "POLICY-HILOTCA-EVENT-OUTPUT"
@pytest.fixture()
def disable_proxy(monkeypatch):
- if 'http_proxy' in os.environ:
- monkeypatch.delenv('http_proxy')
- if 'HTTP_PROXY' in os.environ:
- monkeypatch.delenv('HTTP_PROXY')
+ if "http_proxy" in os.environ:
+ monkeypatch.delenv("http_proxy")
+ if "HTTP_PROXY" in os.environ:
+ monkeypatch.delenv("HTTP_PROXY")
@httpretty.activate
def test_resolve_all(disable_proxy):
htbtmsg = '{"event":{"commonEventHeader":{"startEpochMicrosec":1518616063564475,"sourceId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","eventId":"10048640","reportingEntityId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","priority":"Normal","version":3,"reportingEntityName":"TESTVM","sequence":10048640,"domain":"heartbeat","lastEpochMicrosec":1518616063564476,"eventName":"Heartbeat_vVnf","sourceName":"TESTVM","nfNamingCode":"vVNF"}}}'
- send_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000'
+ send_url = mr_url + "/events/" + intopic + "/DefaultGroup/1?timeout=15000"
print(send_url)
httpretty.register_uri(httpretty.GET, send_url, body=htbtmsg)
- #Use
+ # Use
response = requests.get(send_url)
print(response)
print(response.text)
- assert(response.text == htbtmsg)
- htbtmsg = json.dumps({"event":{"commonEventHeader":{"startEpochMicrosec":1518616063564475,"sourceId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","eventId":"10048640","reportingEntityId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","priority":"Normal","version":3,"reportingEntityName":"TESTVM","sequence":10048640,"domain":"heartbeat","lastEpochMicrosec":1518616063564476,"eventName":"Heartbeat_vVnf","sourceName":"TESTVM","nfNamingCode":"vVNF"}}})
- send_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000'
- print("Send URL : "+send_url)
+ assert response.text == htbtmsg
+ htbtmsg = json.dumps(
+ {
+ "event": {
+ "commonEventHeader": {
+ "startEpochMicrosec": 1518616063564475,
+ "sourceId": "587c14b3-72c0-4581-b5cb-6567310b9bb7",
+ "eventId": "10048640",
+ "reportingEntityId": "587c14b3-72c0-4581-b5cb-6567310b9bb7",
+ "priority": "Normal",
+ "version": 3,
+ "reportingEntityName": "TESTVM",
+ "sequence": 10048640,
+ "domain": "heartbeat",
+ "lastEpochMicrosec": 1518616063564476,
+ "eventName": "Heartbeat_vVnf",
+ "sourceName": "TESTVM",
+ "nfNamingCode": "vVNF",
+ }
+ }
+ }
+ )
+ send_url = mr_url + "/events/" + intopic + "/DefaultGroup/1?timeout=15000"
+ print("Send URL : " + send_url)
httpretty.register_uri(httpretty.GET, send_url, body=htbtmsg, content_type="application/json")
- pol_url = mr_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000'
- pol_body = json.dumps({"event":{"commonEventHeader":{"startEpochMicrosec":1518616063564475,"sourceId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","eventId":"10048640","reportingEntityId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","priority":"Normal","version":3,"reportingEntityName":"TESTVM","sequence":10048640,"domain":"heartbeat","lastEpochMicrosec":1518616063564476,"eventName":"Heartbeat_vVnf","sourceName":"TESTVM","nfNamingCode":"vVNF"}}})
- print("Policy URL : "+pol_url)
- httpretty.register_uri(httpretty.POST, pol_url, body=pol_body, status=200, content_type='text/json')
+ pol_url = mr_url + "/events/" + outopic + "/DefaultGroup/1?timeout=15000"
+ pol_body = json.dumps(
+ {
+ "event": {
+ "commonEventHeader": {
+ "startEpochMicrosec": 1518616063564475,
+ "sourceId": "587c14b3-72c0-4581-b5cb-6567310b9bb7",
+ "eventId": "10048640",
+ "reportingEntityId": "587c14b3-72c0-4581-b5cb-6567310b9bb7",
+ "priority": "Normal",
+ "version": 3,
+ "reportingEntityName": "TESTVM",
+ "sequence": 10048640,
+ "domain": "heartbeat",
+ "lastEpochMicrosec": 1518616063564476,
+ "eventName": "Heartbeat_vVnf",
+ "sourceName": "TESTVM",
+ "nfNamingCode": "vVNF",
+ }
+ }
+ }
+ )
+ print("Policy URL : " + pol_url)
+ httpretty.register_uri(httpretty.POST, pol_url, body=pol_body, status=200, content_type="text/json")
+
def test_full():
- p = subprocess.Popen(['./miss_htbt_service/misshtbtd.py'], stdout=subprocess.PIPE,shell=True)
+ p = subprocess.Popen(["./miss_htbt_service/misshtbtd.py"], stdout=subprocess.PIPE, shell=True)
diff --git a/tests/test_check_health.py b/tests/test_check_health.py
index d3088f9..d77beb1 100644
--- a/tests/test_check_health.py
+++ b/tests/test_check_health.py
@@ -1,7 +1,7 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,9 +22,11 @@ from miss_htbt_service import check_health
import io
+
class MockSocket(object):
def getsockname(self):
- return 'sockname',
+ return ("sockname",)
+
class MockRequest(object):
_sock = MockSocket()
@@ -35,32 +37,40 @@ class MockRequest(object):
self._body = body
def makefile(self, *args, **kwargs):
- if args[0] == 'rb':
- if self._rqtype == 'GET':
- return io.BytesIO(bytes("%s %s HTTP/1.0" % (self._rqtype, self._path), 'utf-8'))
+ if args[0] == "rb":
+ if self._rqtype == "GET":
+ return io.BytesIO(bytes("%s %s HTTP/1.0" % (self._rqtype, self._path), "utf-8"))
else:
- return io.BytesIO(bytes("%s %s HTTP/1.0\r\nContent-Length: %s\r\n\r\n%s" % (self._rqtype, self._path, len(self._body), self._body), 'utf-8'))
- elif args[0] == 'wb':
- return io.BytesIO(b'')
+ return io.BytesIO(
+ bytes(
+ "%s %s HTTP/1.0\r\nContent-Length: %s\r\n\r\n%s"
+ % (self._rqtype, self._path, len(self._body), self._body),
+ "utf-8",
+ )
+ )
+ elif args[0] == "wb":
+ return io.BytesIO(b"")
else:
raise ValueError("Unknown file type to make", args, kwargs)
def sendall(self, bstr):
pass
+
class MockServer(object):
def __init__(self, rqtype, path, ip_port, Handler, body=None):
handler = Handler(MockRequest(rqtype, path, body), ip_port, self)
+
def test_check_health_get():
"""
test the check_health GET and POST handlers using a mock server
"""
- server = MockServer('GET', '/', ('0.0.0.0', 8888), check_health.GetHandler)
+ server = MockServer("GET", "/", ("0.0.0.0", 8888), check_health.GetHandler)
+
def test_check_health_post():
"""
test the check_health GET and POST handlers using a mock server
"""
- server = MockServer('POST', '/', ('0.0.0.0', 8888), check_health.GetHandler,
- '{ "health": "" }')
+ server = MockServer("POST", "/", ("0.0.0.0", 8888), check_health.GetHandler, '{ "health": "" }')
diff --git a/tests/test_get_logger.py b/tests/test_get_logger.py
index cbef9c6..a4ceea5 100644
--- a/tests/test_get_logger.py
+++ b/tests/test_get_logger.py
@@ -1,6 +1,6 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,20 +24,20 @@ log = logging.getLogger(__name__)
def test_configure_logger():
- expected_log_path = Path('./hb_logs.txt')
+ expected_log_path = Path("./hb_logs.txt")
if expected_log_path.exists():
os.remove(expected_log_path)
- get_logger.configure_logger('')
+ get_logger.configure_logger("")
log.info("hi there")
assert expected_log_path.exists()
os.remove(expected_log_path)
def test_configure_logger_with_name():
- expected_log_path = Path('./hb_htbtworker_logs.txt')
+ expected_log_path = Path("./hb_htbtworker_logs.txt")
if expected_log_path.exists():
os.remove(expected_log_path)
- get_logger.configure_logger('htbtworker')
+ get_logger.configure_logger("htbtworker")
log.info("hi there")
assert expected_log_path.exists()
os.remove(expected_log_path)
diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py
index 97e61b4..11cdc4a 100644
--- a/tests/test_htbtworker.py
+++ b/tests/test_htbtworker.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ from miss_htbt_service import htbtworker
import os, tempfile, json
+
def run_test(i):
"""
read_json_file() opens the file CWD/prefix/test{j}.json and returns the json value found there
@@ -28,18 +29,20 @@ def run_test(i):
pdir = f"{prefix}{tdir.name}"
fname = f"{tdir.name}/test{j}.json"
with open(fname, "w") as fp:
- json.dump({ "test": i }, fp)
- assert(os.path.isfile(f"{tdir.name}/test{j}.json"))
- assert(os.path.isfile(f"{pdir}/test{j}.json"))
+ json.dump({"test": i}, fp)
+ assert os.path.isfile(f"{tdir.name}/test{j}.json")
+ assert os.path.isfile(f"{pdir}/test{j}.json")
cfg = htbtworker.read_json_file(i, prefix=pdir)
- assert(cfg["test"] == i)
+ assert cfg["test"] == i
+
def test_read_json_file_0():
run_test(0)
+
def test_read_json_file_1():
run_test(1)
+
def test_read_json_file_2():
run_test(2)
-
diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py
index 35b7111..8803b29 100644
--- a/tests/test_trapd_exit.py
+++ b/tests/test_trapd_exit.py
@@ -1,7 +1,7 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,8 +20,9 @@ import pytest
import unittest
from miss_htbt_service.mod import trapd_exit
-pid_file="/tmp/test_pid_file"
-pid_file_dne="/tmp/test_pid_file_NOT"
+pid_file = "/tmp/test_pid_file"
+pid_file_dne = "/tmp/test_pid_file_NOT"
+
class test_cleanup_and_exit(unittest.TestCase):
"""
@@ -32,10 +33,10 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
Test normal exit works as expected
"""
- open(pid_file, 'w')
+ open(pid_file, "w")
with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
- result = trapd_exit.cleanup_and_exit(0,pid_file)
+ result = trapd_exit.cleanup_and_exit(0, pid_file)
assert pytest_wrapped_sys_exit.type == SystemExit
assert pytest_wrapped_sys_exit.value.code == 0
@@ -44,6 +45,6 @@ class test_cleanup_and_exit(unittest.TestCase):
Test exit with missing PID file exits non-zero
"""
with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
- result = trapd_exit.cleanup_and_exit(0,pid_file_dne)
+ result = trapd_exit.cleanup_and_exit(0, pid_file_dne)
assert pytest_wrapped_sys_exit.type == SystemExit
assert pytest_wrapped_sys_exit.value.code == 1
diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py
index 6cebef2..ffb9bfb 100644
--- a/tests/test_trapd_get_cbs_config.py
+++ b/tests/test_trapd_get_cbs_config.py
@@ -1,7 +1,7 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,19 +22,19 @@ import os
from miss_htbt_service.mod import trapd_get_cbs_config
+
class test_get_cbs_config(unittest.TestCase):
"""
Test the trapd_get_cbs_config mod
"""
- pytest_json_data ="{ \"heartbeat_config\": { \"vnfs\": [{ \"eventName\": \"Heartbeat_vDNS\", \"heartbeatcountmissed\": 3, \"heartbeatinterval\": 60, \"closedLoopControlName\": \"ControlLoopEvent1\", \"policyVersion\": \"1.0.0.5\", \"policyName\": \"vFireWall\", \"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\", \"target_type\": \"VNF\", \"target\": \"genVnfName\", \"version\": \"1.0\" }, { \"eventName\": \"Heartbeat_vFW\", \"heartbeatcountmissed\": 3, \"heartbeatinterval\": 60, \"closedLoopControlName\": \"ControlLoopEvent1\", \"policyVersion\": \"1.0.0.5\", \"policyName\": \"vFireWall\", \"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\", \"target_type\": \"VNF\", \"target\": \"genVnfName\", \"version\": \"1.0\" }, { \"eventName\": \"Heartbeat_xx\", \"heartbeatcountmissed\": 3, \"heartbeatinterval\": 60, \"closedLoopControlName\": \"ControlLoopEvent1\", \"policyVersion\": \"1.0.0.5\", \"policyName\": \"vFireWall\", \"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\", \"target_type\": \"VNF\", \"target\": \"genVnfName\", \"version\": \"1.0\" } ] }, \"streams_publishes\": { \"ves_heartbeat\": { \"dmaap_info\": { \"topic_url\": \"http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/\" }, \"type\": \"message_router\" } }, \"streams_subscribes\": { \"ves_heartbeat\": { \"dmaap_info\": { \"topic_url\": \"http://message-router:3904/events/unauthenticated.SEC_HEARTBEAT_INPUT/\" }, \"type\": \"message_router\" } } }"
+ pytest_json_data = '{ "heartbeat_config": { "vnfs": [{ "eventName": "Heartbeat_vDNS", "heartbeatcountmissed": 3, "heartbeatinterval": 60, "closedLoopControlName": "ControlLoopEvent1", "policyVersion": "1.0.0.5", "policyName": "vFireWall", "policyScope": "resource=sampleResource,type=sampletype,CLName=sampleCLName", "target_type": "VNF", "target": "genVnfName", "version": "1.0" }, { "eventName": "Heartbeat_vFW", "heartbeatcountmissed": 3, "heartbeatinterval": 60, "closedLoopControlName": "ControlLoopEvent1", "policyVersion": "1.0.0.5", "policyName": "vFireWall", "policyScope": "resource=sampleResource,type=sampletype,CLName=sampleCLName", "target_type": "VNF", "target": "genVnfName", "version": "1.0" }, { "eventName": "Heartbeat_xx", "heartbeatcountmissed": 3, "heartbeatinterval": 60, "closedLoopControlName": "ControlLoopEvent1", "policyVersion": "1.0.0.5", "policyName": "vFireWall", "policyScope": "resource=sampleResource,type=sampletype,CLName=sampleCLName", "target_type": "VNF", "target": "genVnfName", "version": "1.0" } ] }, "streams_publishes": { "ves_heartbeat": { "dmaap_info": { "topic_url": "http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/" }, "type": "message_router" } }, "streams_subscribes": { "ves_heartbeat": { "dmaap_info": { "topic_url": "http://message-router:3904/events/unauthenticated.SEC_HEARTBEAT_INPUT/" }, "type": "message_router" } } }'
# create copy of snmptrapd.json for pytest
pytest_json_config = "/tmp/opt/app/miss_htbt_service/etc/config.json"
- with open(pytest_json_config, 'w') as outfile:
+ with open(pytest_json_config, "w") as outfile:
outfile.write(pytest_json_data)
-
def test_cbs_env_present(self):
"""
Test that CONSUL_HOST env variable exists but fails to
@@ -45,13 +45,12 @@ class test_get_cbs_config(unittest.TestCase):
result = trapd_get_cbs_config.get_cbs_config()
assert pytest_wrapped_sys_exit.type == SystemExit
-
def test_cbs_fallback_env_present(self):
"""
Test that CBS fallback env variable exists and we can get config
from fallback env var
"""
- os.environ.update(CBS_HTBT_JSON='/tmp/opt/app/miss_htbt_service/etc/config.json')
+ os.environ.update(CBS_HTBT_JSON="/tmp/opt/app/miss_htbt_service/etc/config.json")
result = True
print("result: %s" % result)
self.assertEqual(result, True)
diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py
index 47380fc..070fc93 100644
--- a/tests/test_trapd_http_session.py
+++ b/tests/test_trapd_http_session.py
@@ -1,7 +1,7 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
import unittest
from miss_htbt_service.mod import trapd_http_session
+
class test_init_session_obj(unittest.TestCase):
"""
Test the init_session_obj mod
diff --git a/tests/test_trapd_runtime_pid.py b/tests/test_trapd_runtime_pid.py
index f31c4db..47bc642 100644
--- a/tests/test_trapd_runtime_pid.py
+++ b/tests/test_trapd_runtime_pid.py
@@ -1,7 +1,7 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
import unittest
from miss_htbt_service.mod import trapd_runtime_pid
+
class test_save_pid(unittest.TestCase):
"""
Test the save_pid mod
@@ -28,16 +29,17 @@ class test_save_pid(unittest.TestCase):
"""
Test that attempt to create pid file in standard location works
"""
- result = trapd_runtime_pid.save_pid('/tmp/snmptrap_test_pid_file')
+ result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file")
self.assertEqual(result, True)
def test_missing_directory(self):
"""
Test that attempt to create pid file in missing dir fails
"""
- result = trapd_runtime_pid.save_pid('/bogus/directory/for/snmptrap_test_pid_file')
+ result = trapd_runtime_pid.save_pid("/bogus/directory/for/snmptrap_test_pid_file")
self.assertEqual(result, False)
+
class test_rm_pid(unittest.TestCase):
"""
Test the rm_pid mod
@@ -48,14 +50,14 @@ class test_rm_pid(unittest.TestCase):
Test that attempt to remove pid file in standard location works
"""
# must create it before removing it
- result = trapd_runtime_pid.save_pid('/tmp/snmptrap_test_pid_file')
+ result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file")
self.assertEqual(result, True)
- result = trapd_runtime_pid.rm_pid('/tmp/snmptrap_test_pid_file')
+ result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file")
self.assertEqual(result, True)
def test_missing_file(self):
"""
Test that attempt to rm non-existent pid file fails
"""
- result = trapd_runtime_pid.rm_pid('/tmp/snmptrap_test_pid_file_9999')
+ result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file_9999")
self.assertEqual(result, False)
diff --git a/tests/test_trapd_settings.py b/tests/test_trapd_settings.py
index d800119..f3aebdd 100644
--- a/tests/test_trapd_settings.py
+++ b/tests/test_trapd_settings.py
@@ -1,7 +1,7 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,8 +20,8 @@ import unittest
from miss_htbt_service.mod import trapd_settings as tds
-pid_file="/tmp/test_pid_file"
-pid_file_dne="/tmp/test_pid_file_NOT"
+pid_file = "/tmp/test_pid_file"
+pid_file_dne = "/tmp/test_pid_file_NOT"
class test_cleanup_and_exit(unittest.TestCase):
@@ -29,7 +29,6 @@ class test_cleanup_and_exit(unittest.TestCase):
Test for presense of required vars
"""
-
def test_nonexistent_dict(self):
"""
Test nosuch var
diff --git a/tests/test_trapd_vnf_table.py b/tests/test_trapd_vnf_table.py
index e8ef5d0..b88b4d5 100644
--- a/tests/test_trapd_vnf_table.py
+++ b/tests/test_trapd_vnf_table.py
@@ -1,10 +1,8 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,12 +24,20 @@ test_trapd_vnf_table contains test cases related to DB Tables and cbs polling.
import logging
import unittest
from mod.trapd_vnf_table import (
- verify_DB_creation_1, verify_DB_creation_2, verify_DB_creation_hb_common,
- hb_properties, verify_cbspolling,
- verify_sendControlLoop_VNF_ONSET, verify_sendControlLoop_VM_ONSET,
- verify_sendControlLoop_VNF_ABATED, verify_sendControlLoop_VM_ABATED,
- verify_fetch_json_file, verify_misshtbtdmain, verify_dbmonitoring,
- verify_dbmon_startup)
+ verify_DB_creation_1,
+ verify_DB_creation_2,
+ verify_DB_creation_hb_common,
+ hb_properties,
+ verify_cbspolling,
+ verify_sendControlLoop_VNF_ONSET,
+ verify_sendControlLoop_VM_ONSET,
+ verify_sendControlLoop_VNF_ABATED,
+ verify_sendControlLoop_VM_ABATED,
+ verify_fetch_json_file,
+ verify_misshtbtdmain,
+ verify_dbmonitoring,
+ verify_dbmon_startup,
+)
_logger = logging.getLogger(__name__)
@@ -40,19 +46,20 @@ class test_vnf_tables(unittest.TestCase):
"""
Test the DB Creation
"""
+
global ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties()
def test_validate_vnf_table_1(self):
- result =verify_DB_creation_1(user_name,password,ip_address,port_num,db_name)
+ result = verify_DB_creation_1(user_name, password, ip_address, port_num, db_name)
self.assertEqual(result, True)
def test_validate_vnf_table_2(self):
- result =verify_DB_creation_2(user_name,password,ip_address,port_num,db_name)
+ result = verify_DB_creation_2(user_name, password, ip_address, port_num, db_name)
self.assertEqual(result, True)
def test_validate_hb_common(self):
- result =verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name)
+ result = verify_DB_creation_hb_common(user_name, password, ip_address, port_num, db_name)
self.assertEqual(result, True)
def test_cbspolling(self):
@@ -60,41 +67,41 @@ class test_vnf_tables(unittest.TestCase):
verify_cbspolling()
def test_fetch_json_file(self):
- result= verify_fetch_json_file()
+ result = verify_fetch_json_file()
_logger.info(result)
self.assertEqual(result, True)
def test_misshtbtdmain(self):
- result= verify_misshtbtdmain()
+ result = verify_misshtbtdmain()
_logger.info(result)
self.assertEqual(result, True)
def test_dbmon_startup(self):
- result= verify_dbmon_startup()
+ result = verify_dbmon_startup()
_logger.info(result)
self.assertEqual(result, True)
def test_dbmonitoring(self):
- result= verify_dbmonitoring()
+ result = verify_dbmonitoring()
_logger.info(result)
self.assertEqual(result, True)
def test_sendControlLoop_VNF_ONSET(self):
- result= verify_sendControlLoop_VNF_ONSET()
+ result = verify_sendControlLoop_VNF_ONSET()
_logger.info(result)
self.assertEqual(result, True)
def test_sendControlLoop_VM_ONSET(self):
- result= verify_sendControlLoop_VM_ONSET()
+ result = verify_sendControlLoop_VM_ONSET()
_logger.info(result)
self.assertEqual(result, True)
def test_sendControlLoop_VNF_ABATED(self):
- result= verify_sendControlLoop_VNF_ABATED()
+ result = verify_sendControlLoop_VNF_ABATED()
_logger.info(result)
self.assertEqual(result, True)
def test_sendControlLoop_VM_ABATED(self):
- result= verify_sendControlLoop_VM_ABATED()
+ result = verify_sendControlLoop_VM_ABATED()
_logger.info(result)
self.assertEqual(result, True)