From 2108563705a2ec8bb80029d36122c69fa4d06df5 Mon Sep 17 00:00:00 2001 From: "Hansen, Tony (th1395)" Date: Wed, 1 Dec 2021 22:01:56 +0000 Subject: run the black formatting tool on python code also fix up some copyright & license block lines Change-Id: Ifb628e2ef1e5f13fed0a29964eec387d3982d605 Signed-off-by: Hansen, Tony (th1395) Issue-ID: DCAEGEN2-2995 Signed-off-by: Hansen, Tony (th1395) --- Changelog.md | 1 + miss_htbt_service/__init__.py | 5 +- miss_htbt_service/cbs_polling.py | 24 ++- miss_htbt_service/check_health.py | 47 ++--- miss_htbt_service/db_monitoring.py | 243 +++++++++++++++--------- miss_htbt_service/get_logger.py | 10 +- miss_htbt_service/htbtworker.py | 114 +++++++----- miss_htbt_service/misshtbtd.py | 257 +++++++++++++++++--------- miss_htbt_service/mod/trapd_exit.py | 11 +- miss_htbt_service/mod/trapd_get_cbs_config.py | 32 ++-- miss_htbt_service/mod/trapd_http_session.py | 6 +- miss_htbt_service/mod/trapd_io.py | 23 ++- miss_htbt_service/mod/trapd_runtime_pid.py | 10 +- miss_htbt_service/mod/trapd_settings.py | 6 +- miss_htbt_service/mod/trapd_vnf_table.py | 193 ++++++++++++------- setup.py | 20 +- tests/monkey_psycopg2.py | 27 ++- tests/test_binding.py | 85 ++++++--- tests/test_check_health.py | 34 ++-- tests/test_get_logger.py | 12 +- tests/test_htbtworker.py | 15 +- tests/test_trapd_exit.py | 15 +- tests/test_trapd_get_cbs_config.py | 13 +- tests/test_trapd_http_session.py | 5 +- tests/test_trapd_runtime_pid.py | 16 +- tests/test_trapd_settings.py | 9 +- tests/test_trapd_vnf_table.py | 51 ++--- 27 files changed, 788 insertions(+), 496 deletions(-) diff --git a/Changelog.md b/Changelog.md index 5aeff31..2ecfd88 100644 --- a/Changelog.md +++ b/Changelog.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [2.4.0] - 2021/10/12 ### Changed - [DCAEGEN2-2939] Removed unused code (config\_notif.py) +- [DCAEGEN2-2995] run the black formatting tool on python code ### Fixed - [DCAEGEN2-2832] Pod become unready state - [DCAEGEN2-2872] No such file or directory error and stop working diff --git a/miss_htbt_service/__init__.py b/miss_htbt_service/__init__.py index 1ae08ca..a56d536 100644 --- a/miss_htbt_service/__init__.py +++ b/miss_htbt_service/__init__.py @@ -1,5 +1,5 @@ -# ================================================================================ -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# ============LICENSE_START======================================================= +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,4 +16,3 @@ # empty __init__.py so that pytest can add correct path to coverage report, -- per pytest # best practice guideline - diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py index e7bdef1..53585ba 100644 --- a/miss_htbt_service/cbs_polling.py +++ b/miss_htbt_service/cbs_polling.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,7 +35,15 @@ _logger = logging.getLogger(__name__) def poll_cbs(current_pid: int) -> None: jsfile = db.fetch_json_file() - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(jsfile) hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) msg = "CBSP:Main process ID in hb_common is %d", hbc_pid _logger.info(msg) @@ -43,13 +51,13 @@ def poll_cbs(current_pid: int) -> None: _logger.info(msg) msg = "CBSP:CBS Polling interval is %d", cbs_polling_interval _logger.info(msg) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": cbs_polling_interval = "30" time.sleep(int(cbs_polling_interval)) hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) if current_pid == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING": _logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION") state = "RECONFIGURATION" @@ -60,7 +68,7 @@ def poll_cbs(current_pid: int) -> None: def cbs_polling_loop(current_pid: int): - get_logger.configure_logger('cbs_polling') + get_logger.configure_logger("cbs_polling") while True: poll_cbs(current_pid) diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py index 4d04210..71f7c58 100644 --- a/miss_htbt_service/check_health.py +++ b/miss_htbt_service/check_health.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,38 +22,39 @@ from urllib import parse class GetHandler(BaseHTTPRequestHandler): - def do_GET(self): parsed_path = parse.urlparse(self.path) - message = '\n'.join([ - 'CLIENT VALUES:', - 'client_address=%s (%s)' % (self.client_address, self.address_string()), - 'command=%s' % self.command, - 'path=%s' % self.path, - 'real path=%s' % parsed_path.path, - 'query=%s' % parsed_path.query, - 'request_version=%s' % self.request_version, - '', - 'SERVER VALUES:', - 'server_version=%s' % self.server_version, - 'sys_version=%s' % self.sys_version, - 'protocol_version=%s' % self.protocol_version, - '', - ]) + message = "\n".join( + [ + "CLIENT VALUES:", + "client_address=%s (%s)" % (self.client_address, self.address_string()), + "command=%s" % self.command, + "path=%s" % self.path, + "real path=%s" % parsed_path.path, + "query=%s" % parsed_path.query, + "request_version=%s" % self.request_version, + "", + "SERVER VALUES:", + "server_version=%s" % self.server_version, + "sys_version=%s" % self.sys_version, + "protocol_version=%s" % self.protocol_version, + "", + ] + ) self.send_response(200) self.end_headers() - self.wfile.write(bytes(message, 'utf-8')) + self.wfile.write(bytes(message, "utf-8")) return def do_POST(self): - content_len = int(self.headers.get('content-length', 0)) + content_len = int(self.headers.get("content-length", 0)) post_body = self.rfile.read(content_len) self.send_response(200) self.end_headers() data = json.loads(post_body) - self.wfile.write(bytes(data['health'], 'utf-8')) + self.wfile.write(bytes(data["health"], "utf-8")) return @@ -61,9 +62,9 @@ def start_health_check_server() -> None: from http.server import HTTPServer server = HTTPServer(("", 10002), GetHandler) - print('Starting server at http://localhost:10002') + print("Starting server at http://localhost:10002") server.serve_forever() -if __name__ == '__main__': +if __name__ == "__main__": start_health_check_server() diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index a405876..32a2d5c 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,81 +37,100 @@ import get_logger _logger = logging.getLogger(__name__) -def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, - closed_control_loop_name, version, target): +def sendControlLoopEvent( + CLType, + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + srcName, + epoc_time, + closed_control_loop_name, + version, + target, +): msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type _logger.info(msg) if CLType == "ONSET": _logger.info("DBM:Heartbeat not received, raising alarm event") if target_type == "VNF": - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": {"generic-vnf.vnf-name": srcName}, - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }) + json_object = json.dumps( + { + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"generic-vnf.vnf-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE", + } + ) elif target_type == "VM": - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": {"vserver.vserver-name": srcName}, - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }) + json_object = json.dumps( + { + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"vserver.vserver-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE", + } + ) else: return True elif CLType == "ABATED": _logger.info("DBM:Heartbeat received, clearing alarm event") # last_date_time = datetime.datetime.now() if target_type == "VNF": - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": {"generic-vnf.vnf-name": srcName}, - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ABATED", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }) + json_object = json.dumps( + { + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"generic-vnf.vnf-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE", + } + ) elif target_type == "VM": - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": {"vserver.vserver-name": srcName}, - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ABATED", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }) + json_object = json.dumps( + { + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"vserver.vserver-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE", + } + ) else: return True else: @@ -131,7 +150,7 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_sc msg = "DBM:Status code for sending the control loop event is", ret _logger.info(msg) except Exception as err: - msg = 'Message send failure : ', err + msg = "Message send failure : ", err _logger.error(msg) return True @@ -140,22 +159,24 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ while True: time.sleep(20) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": break try: - with open(json_file, 'r') as outfile: + with open(json_file, "r") as outfile: cfg = json.load(outfile) - pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) + pol_url = str(cfg["streams_publishes"]["dcae_cl_out"]["dmaap_info"]["topic_url"]) except Exception as err: - msg = 'Json file process error : ', err + msg = "Json file process error : ", err _logger.error(msg) continue - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() if int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING": @@ -170,9 +191,12 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ _logger.info("DBM:Waiting for hb_common state to become RUNNING") break - cur.execute("SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " - "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " - "target, version FROM vnf_table_1 WHERE event_name = %s", (event_name,)) + cur.execute( + "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " + "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " + "target, version FROM vnf_table_1 WHERE event_name = %s", + (event_name,), + ) rows = cur.fetchall() validity_flag = rows[0][0] source_name_count = rows[0][1] @@ -189,8 +213,11 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ if validity_flag == 1: for source_name_key in range(source_name_count): epoc_time = int(round(time.time() * 1000)) - cur.execute("SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " - "event_name = %s AND source_name_key = %s", (event_name, (source_name_key + 1))) + cur.execute( + "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " + "event_name = %s AND source_name_key = %s", + (event_name, (source_name_key + 1)), + ) row = cur.fetchall() if len(row) == 0: continue @@ -198,20 +225,44 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ srcName = row[0][1] cl_flag = row[0][2] if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: - sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, - target_type, srcName, epoc_time, closed_control_loop_name, version, - target) + sendControlLoopEvent( + "ONSET", + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + srcName, + epoc_time, + closed_control_loop_name, + version, + target, + ) cl_flag = 1 - cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " - "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1))) + cur.execute( + "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", + (cl_flag, event_name, (source_name_key + 1)), + ) connection_db.commit() elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: - sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, - target_type, srcName, epoc_time, closed_control_loop_name, version, - target) + sendControlLoopEvent( + "ABATED", + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + srcName, + epoc_time, + closed_control_loop_name, + version, + target, + ) cl_flag = 0 - cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " - "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1))) + cur.execute( + "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", + (cl_flag, event_name, (source_name_key + 1)), + ) connection_db.commit() else: # pragma: no cover @@ -233,15 +284,23 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ if __name__ == "__main__": - get_logger.configure_logger('db_monitoring') + get_logger.configure_logger("db_monitoring") _logger.info("DBM: DBM Process started") current_pid = sys.argv[1] jsfile = sys.argv[2] - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(jsfile) msg = "DBM:Parent process ID and json file name", current_pid, jsfile _logger.info(msg) while True: db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": break diff --git a/miss_htbt_service/get_logger.py b/miss_htbt_service/get_logger.py index 55286eb..8068f6b 100644 --- a/miss_htbt_service/get_logger.py +++ b/miss_htbt_service/get_logger.py @@ -1,6 +1,6 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ import logging.handlers LOG_LEVEL = logging.DEBUG -LOG_FORMAT = '%(asctime)s | %(levelname)5s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(message)s' +LOG_FORMAT = "%(asctime)s | %(levelname)5s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(message)s" LOG_MAXSIZE = 10485760 * 5 LOG_BACKUP_COUNT = 10 @@ -39,9 +39,9 @@ def configure_logger(proc_name: str) -> None: # Add rotating log file handler if proc_name: - logfile_path = './hb_%s_logs.txt' % proc_name + logfile_path = "./hb_%s_logs.txt" % proc_name else: - logfile_path = './hb_logs.txt' + logfile_path = "./hb_logs.txt" fhandler = logging.handlers.RotatingFileHandler(logfile_path, maxBytes=LOG_MAXSIZE, backupCount=LOG_BACKUP_COUNT) fhandler.setFormatter(formatter) root.addHandler(fhandler) diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index be1b6aa..44436a2 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ def read_json_file(i, prefix="../../tests"): with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: cfg = json.load(outfile) elif i == 2: - with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile: + with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile: cfg = json.load(outfile) return cfg @@ -56,19 +56,21 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): sleep_duration = 20 while True: time.sleep(sleep_duration) - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) - mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) + mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"]) while True: - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) if hbc_state == "RECONFIGURATION": _logger.info("HBT:Waiting for hb_common state to become RUNNING") time.sleep(10) else: break - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] connection_db = 0 else: @@ -79,13 +81,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): msg = "\n\nHBT:eventnameList values ", eventnameList _logger.info(msg) if "groupID" not in os.environ or "consumerID" not in os.environ: - get_url = mr_url + '/DefaultGroup/1?timeout=15000' + get_url = mr_url + "/DefaultGroup/1?timeout=15000" else: - get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000' + get_url = mr_url + "/" + os.getenv("groupID", "") + "/" + os.getenv("consumerID", "") + "?timeout=15000" msg = "HBT:Getting :" + get_url _logger.info(msg) - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": jsonobj = read_json_file(i) jobj = [] jobj.append(jsonobj) @@ -103,14 +105,14 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): # If mrstatus in message body indicates some information, not json msg. if "mrstatus" in inputString: continue - jlist = inputString.split('\n') + jlist = inputString.split("\n") # Process the DMaaP input message retreived error = False for line in jlist: try: jobj = json.loads(line) except ValueError: - msg = 'HBT:Decoding JSON has failed' + msg = "HBT:Decoding JSON has failed" _logger.error(msg) error = True break @@ -120,17 +122,17 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): continue for item in jobj: try: - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": jitem = jsonobj else: jitem = json.loads(item) - srcname = (jitem['event']['commonEventHeader']['sourceName']) - lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec']) + srcname = jitem["event"]["commonEventHeader"]["sourceName"] + lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] # if lastEpochMicrosec looks like microsec, align it with millisec if lastepo > 1000000000000000: lastepo = int(lastepo / 1000) - seqnum = (jitem['event']['commonEventHeader']['sequence']) - eventName = (jitem['event']['commonEventHeader']['eventName']) + seqnum = jitem["event"]["commonEventHeader"]["sequence"] + eventName = jitem["event"]["commonEventHeader"]["eventName"] except Exception as err: msg = "HBT message process error - ", err _logger.error(msg) @@ -140,7 +142,8 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if db_table_creation_check(connection_db, "vnf_table_2") is False: msg = "HBT:Creating vnf_table_2" _logger.info(msg) - cur.execute(""" + cur.execute( + """ CREATE TABLE vnf_table_2 ( EVENT_NAME varchar, SOURCE_NAME_KEY integer, @@ -148,12 +151,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer - )""") + )""" + ) else: msg = "HBT:vnf_table_2 is already there" _logger.info(msg) if eventName in eventnameList: # pragma: no cover - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": break cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (eventName,)) row = cur.fetchone() @@ -162,16 +166,22 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): cl_flag = 0 if source_name_count == 0: # pragma: no cover _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (eventName, source_name_key, lastepo, srcname, cl_flag)) - cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", - (source_name_key, eventName)) + cur.execute( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + (eventName, source_name_key, lastepo, srcname, cl_flag), + ) + cur.execute( + "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", + (source_name_key, eventName), + ) else: # pragma: no cover msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count _logger.info(msg) for source_name_key in range(source_name_count): - cur.execute("SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " - "source_name_key = %s", (eventName, (source_name_key + 1))) + cur.execute( + "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s", + (eventName, (source_name_key + 1)), + ) row = cur.fetchall() if len(row) == 0: continue @@ -179,9 +189,11 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if db_srcname == srcname: msg = "HBT: Update vnf_table_2 : ", source_name_key, row _logger.info(msg) - cur.execute("UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " - "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", - (lastepo, srcname, eventName, (source_name_key + 1))) + cur.execute( + "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " + "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", + (lastepo, srcname, eventName, (source_name_key + 1)), + ) source_name_key = source_name_count break else: @@ -191,27 +203,31 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if source_name_count == (source_name_key + 1): source_name_key = source_name_count + 1 _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (eventName, source_name_key, lastepo, srcname, cl_flag)) - cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", - (source_name_key, eventName)) + cur.execute( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + (eventName, source_name_key, lastepo, srcname, cl_flag), + ) + cur.execute( + "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", + (source_name_key, eventName), + ) else: _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") commit_db(connection_db) commit_and_close_db(connection_db) - if os.getenv('pytest', "") != 'test': + if os.getenv("pytest", "") != "test": cur.close() def postgres_db_open(username, password, host, port, database_name): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port) return connection def db_table_creation_check(connection_db, table_name): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: cur = connection_db.cursor() @@ -223,43 +239,51 @@ def db_table_creation_check(connection_db, table_name): else: return False except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) finally: cur.close() def commit_db(connection_db): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: connection_db.commit() # <--- makes sure the change is shown in the database return True except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) return False def commit_and_close_db(connection_db): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() return True except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) return False -if __name__ == '__main__': - get_logger.configure_logger('htbtworker') +if __name__ == "__main__": + get_logger.configure_logger("htbtworker") jsfile = sys.argv[1] msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") msg = "HBT:The config file name passed is -%s", jsfile _logger.info(msg) - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(jsfile) process_msg(jsfile, user_name, password, ip_address, port_num, db_name) diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 6be2260..5ba0860 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -2,9 +2,9 @@ # ============LICENSE_START======================================================= # Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Samsung Electronics. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Samsung Electronics. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -59,6 +59,7 @@ CONFIG_PATH = "../etc/config.json" def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name): from psycopg2 import connect + try: con = connect(user=user_name, host=ip_address, password=password) database_name = db_name @@ -82,8 +83,8 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password def read_hb_common(user_name, password, ip_address, port_num, db_name): - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": hbc_pid = 10 hbc_srcName = "srvc_name" hbc_time = 1584595881 @@ -105,41 +106,47 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name): def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name): current_time = int(round(time.time())) source_name = socket.gethostname() - source_name = source_name + "-" + os.getenv('SERVICE_NAME', "") - envPytest = os.getenv('pytest', "") - if envPytest != 'test': + source_name = source_name + "-" + os.getenv("SERVICE_NAME", "") + envPytest = os.getenv("pytest", "") + if envPytest != "test": connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() if heartbeat.db_table_creation_check(connection_db, "hb_common") is False: - cur.execute(""" + cur.execute( + """ CREATE TABLE hb_common ( PROCESS_ID integer primary key, SOURCE_NAME varchar, LAST_ACCESSED_TIME integer, CURRENT_STATE varchar - )""") + )""" + ) cur.execute("INSERT INTO hb_common VALUES(%s, %s, %s, %s)", (process_id, source_name, current_time, state)) _logger.info("MSHBT:Created hb_common DB and updated new values") elif update_flg == 1: - cur.execute("UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s " - "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", (current_time, state, process_id, source_name)) + cur.execute( + "UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s " + "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", + (current_time, state, process_id, source_name), + ) _logger.info("MSHBT:Updated hb_common DB with new values") heartbeat.commit_and_close_db(connection_db) cur.close() def create_update_vnf_table_1(jsfile, update_db, connection_db): - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) - hbcfg = cfg['heartbeat_config'] + hbcfg = cfg["heartbeat_config"] jhbcfg = json.loads(hbcfg) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] else: cur = connection_db.cursor() if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False: - cur.execute(""" + cur.execute( + """ CREATE TABLE vnf_table_1 ( EVENT_NAME varchar primary key, HEARTBEAT_MISSED_COUNT integer, @@ -153,7 +160,8 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): VERSION varchar, SOURCE_NAME_COUNT integer, VALIDITY_FLAG integer - )""") + )""" + ) _logger.info("MSHBT:Created vnf_table_1 table") if update_db == 1: cur.execute("UPDATE vnf_table_1 SET VALIDITY_FLAG=0 WHERE VALIDITY_FLAG=1") @@ -161,35 +169,62 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): # Put some initial values into the queue cur.execute("SELECT event_name FROM vnf_table_1") vnf_list = [item[0] for item in cur.fetchall()] - for vnf in (jhbcfg['vnfs']): - nfc = vnf['eventName'] + for vnf in jhbcfg["vnfs"]: + nfc = vnf["eventName"] validity_flag = 1 source_name_count = 0 - missed = vnf['heartbeatcountmissed'] - intvl = vnf['heartbeatinterval'] - clloop = vnf['closedLoopControlName'] - policyVersion = vnf['policyVersion'] - policyName = vnf['policyName'] - policyScope = vnf['policyScope'] - target_type = vnf['target_type'] - target = vnf['target'] - version = vnf['version'] - - if envPytest == 'test': + missed = vnf["heartbeatcountmissed"] + intvl = vnf["heartbeatinterval"] + clloop = vnf["closedLoopControlName"] + policyVersion = vnf["policyVersion"] + policyName = vnf["policyName"] + policyScope = vnf["policyScope"] + target_type = vnf["target_type"] + target = vnf["target"] + version = vnf["version"] + + if envPytest == "test": # skip executing SQL in test continue if nfc not in vnf_list: - cur.execute("INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", - (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, - version, source_name_count, validity_flag)) + cur.execute( + "INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + ( + nfc, + missed, + intvl, + clloop, + policyVersion, + policyName, + policyScope, + target_type, + target, + version, + source_name_count, + validity_flag, + ), + ) _logger.debug("Inserted new event_name = %s into vnf_table_1", nfc) else: - cur.execute("""UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s, + cur.execute( + """UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s, CLOSED_CONTROL_LOOP_NAME = %s, POLICY_VERSION = %s, POLICY_NAME = %s, POLICY_SCOPE = %s, TARGET_TYPE = %s, TARGET = %s, VERSION = %s, VALIDITY_FLAG = %s where EVENT_NAME = %s""", - (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, - validity_flag, nfc)) - if envPytest != 'test': + ( + missed, + intvl, + clloop, + policyVersion, + policyName, + policyScope, + target_type, + target, + version, + validity_flag, + nfc, + ), + ) + if envPytest != "test": cur.close() _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file") @@ -210,31 +245,36 @@ def db_monitoring_process(current_pid, jsfile): def read_hb_properties_default(): # Read the hbproperties.yaml for postgress and CBS related data - s = open(hb_properties_file, 'r') + s = open(hb_properties_file, "r") a = yaml.full_load(s) - if (os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None): - ip_address = a['pg_ipAddress'] - port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] + if ( + (os.getenv("pg_ipAddress") is None) + or (os.getenv("pg_portNum") is None) + or (os.getenv("pg_userName") is None) + or (os.getenv("pg_passwd") is None) + ): + ip_address = a["pg_ipAddress"] + port_num = a["pg_portNum"] + user_name = a["pg_userName"] + password = a["pg_passwd"] else: - ip_address = os.getenv('pg_ipAddress') - port_num = os.getenv('pg_portNum') - user_name = os.getenv('pg_userName') - password = os.getenv('pg_passwd') + ip_address = os.getenv("pg_ipAddress") + port_num = os.getenv("pg_portNum") + user_name = os.getenv("pg_userName") + password = os.getenv("pg_passwd") - dbName = a['pg_dbName'] + dbName = a["pg_dbName"] db_name = dbName.lower() - cbs_polling_required = a['CBS_polling_allowed'] - cbs_polling_interval = a['CBS_polling_interval'] + cbs_polling_required = a["CBS_polling_allowed"] + cbs_polling_interval = a["CBS_polling_interval"] s.close() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval def read_hb_properties(jsfile): try: - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) except Exception as err: msg = "CBS Json file load error - " + str(err) @@ -242,20 +282,20 @@ def read_hb_properties(jsfile): return read_hb_properties_default() try: - ip_address = str(cfg['pg_ipAddress']) - port_num = str(cfg['pg_portNum']) - user_name = str(cfg['pg_userName']) - password = str(cfg['pg_passwd']) - dbName = str(cfg['pg_dbName']) + ip_address = str(cfg["pg_ipAddress"]) + port_num = str(cfg["pg_portNum"]) + user_name = str(cfg["pg_userName"]) + password = str(cfg["pg_passwd"]) + dbName = str(cfg["pg_dbName"]) db_name = dbName.lower() - cbs_polling_required = str(cfg['CBS_polling_allowed']) - cbs_polling_interval = str(cfg['CBS_polling_interval']) - consumer_id = str(cfg['consumerID']) - group_id = str(cfg['groupID']) - os.environ['consumerID'] = consumer_id - os.environ['groupID'] = group_id + cbs_polling_required = str(cfg["CBS_polling_allowed"]) + cbs_polling_interval = str(cfg["CBS_polling_interval"]) + consumer_id = str(cfg["consumerID"]) + group_id = str(cfg["groupID"]) + os.environ["consumerID"] = consumer_id + os.environ["groupID"] = group_id if "SERVICE_NAME" in cfg: - os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) + os.environ["SERVICE_NAME"] = str(cfg["SERVICE_NAME"]) except Exception as err: msg = "CBS Json file read parameter error - " + str(err) _logger.error(msg) @@ -273,7 +313,7 @@ def fetch_json_file() -> str: # Try to get config from CBS. If succeeded, config json is stored to tds.c_config . if get_cbs_config(): # Save config to temporary file - with tempfile.NamedTemporaryFile('w', delete=False) as temp: + with tempfile.NamedTemporaryFile("w", delete=False) as temp: _logger.info("MSHBD: New config saved to temp file %s", temp.name) json.dump(tds.c_config, temp) # Swap current config with downloaded config @@ -289,8 +329,8 @@ def fetch_json_file() -> str: def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name): - envPytest = os.getenv('pytest', "") - if envPytest != 'test': # pragma: no cover + envPytest = os.getenv("pytest", "") + if envPytest != "test": # pragma: no cover if update_db == 0: create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name @@ -310,7 +350,13 @@ def create_process(job_list, jsfile, pid_current): if len(job_list) == 0: p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) time.sleep(1) - p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,)) + p2 = multiprocessing.Process( + target=db_monitoring_process, + args=( + pid_current, + jsfile, + ), + ) p1.start() time.sleep(1) p2.start() @@ -322,7 +368,7 @@ def create_process(job_list, jsfile, pid_current): def main(): - get_logger.configure_logger('misshtbtd') + get_logger.configure_logger("misshtbtd") pid_current = os.getpid() hc_proc = multiprocessing.Process(target=check_health.start_health_check_server) cbs_polling_proc = multiprocessing.Process(target=cbs_polling.cbs_polling_loop, args=(pid_current,)) @@ -334,15 +380,32 @@ def main(): job_list = [] jsfile = fetch_json_file() - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile) - msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = read_hb_properties(jsfile) + msg = ( + "MSHBT:HB Properties -", + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) _logger.info(msg) update_db = 0 create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) state = "RECONFIGURATION" update_flg = 0 create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) - if cbs_polling_required == 'True': + if cbs_polling_required == "True": # note: cbs_polling process must be started after `hb_common` table created cbs_polling_proc.start() _logger.info("MSHBD: Started CBS polling process. PID=%d", cbs_polling_proc.pid) @@ -350,17 +413,27 @@ def main(): _logger.info("MSHBD:Now be in a continuous loop") i = 0 while True: - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common( + user_name, password, ip_address, port_num, db_name + ) msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time _logger.info(msg) current_time = int(round(time.time())) time_difference = current_time - hbc_time - msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference + msg = ( + "MSHBD:pid,srcName,state,time,ctime,timeDiff is", + hbc_pid, + hbc_srcName, + hbc_state, + hbc_time, + current_time, + time_difference, + ) _logger.info(msg) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) + envPytest = os.getenv("pytest", "") + if envPytest == "test": if i == 2: hbc_pid = pid_current source_name = hbc_srcName @@ -376,9 +449,13 @@ def main(): if hbc_state == "RUNNING": state = "RUNNING" update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name + ) elif hbc_state == "RECONFIGURATION": - _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") + _logger.info( + "MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes" + ) jsfile = fetch_json_file() update_db = 1 create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) @@ -387,7 +464,9 @@ def main(): job_list = create_process(job_list, jsfile, pid_current) state = "RUNNING" update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name + ) else: _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") @@ -395,7 +474,11 @@ def main(): _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") else: jsfile = fetch_json_file() - msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current + msg = ( + "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", + jsfile, + pid_current, + ) _logger.info(msg) job_list = create_process(job_list, jsfile, pid_current) else: @@ -409,13 +492,17 @@ def main(): msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current _logger.info(msg) job_list = create_process(job_list, jsfile, pid_current) - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common( + user_name, password, ip_address, port_num, db_name + ) update_flg = 1 - create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name + ) else: _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") time.sleep(25) - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": i = i + 1 if i > 5: _logger.info("Terminating main process for pytest") @@ -451,5 +538,5 @@ def main(): cbs_polling_proc.join() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/miss_htbt_service/mod/trapd_exit.py b/miss_htbt_service/mod/trapd_exit.py index c741fe5..aae12bb 100644 --- a/miss_htbt_service/mod/trapd_exit.py +++ b/miss_htbt_service/mod/trapd_exit.py @@ -1,9 +1,7 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,7 +21,7 @@ trapc_exit_snmptrapd is responsible for removing any existing runtime PID file, and exiting with the provided (param 1) exit code """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import sys import os @@ -62,6 +60,7 @@ def cleanup_and_exit(_loc_exit_code, _pid_file_name): rc = rm_pid(_pid_file_name) sys.exit(_loc_exit_code) + # # # # # # # # # # # # # # fx: cleanup_and_exit # - remove pid file @@ -89,5 +88,3 @@ def cleanup(_loc_exit_code, _pid_file_name): if _pid_file_name is not None: rc = rm_pid(_pid_file_name) - - diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/trapd_get_cbs_config.py index 70dac53..f9fd4c3 100644 --- a/miss_htbt_service/mod/trapd_get_cbs_config.py +++ b/miss_htbt_service/mod/trapd_get_cbs_config.py @@ -1,10 +1,8 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ # Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Samsung Electronics. All rights reserved. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Samsung Electronics. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,7 +23,7 @@ env variable that specifies JSON equiv of CBS config (typically used for testing purposes) """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import json import os @@ -36,7 +34,7 @@ import traceback import collections.abc from onap_dcae_cbs_docker_client.client import get_config from mod import trapd_settings as tds -from mod.trapd_exit import cleanup,cleanup_and_exit +from mod.trapd_exit import cleanup, cleanup_and_exit from mod.trapd_io import stdout_logger prog_name = os.path.basename(__file__) @@ -75,46 +73,44 @@ def get_cbs_config(): _cbs_sim_json_file = os.getenv("CBS_HTBT_JSON", "None") except Exception as e: stdout_logger(msg) - cleanup(1,None) + cleanup(1, None) return False msg = "CBS_HTBT_JSON not defined - FATAL ERROR, exiting" if _cbs_sim_json_file == "None": stdout_logger(msg) - cleanup(1,None) + cleanup(1, None) return False else: - msg = ("ONAP controller override specified via CBS_HTBT_JSON: %s" % - _cbs_sim_json_file) + msg = "ONAP controller override specified via CBS_HTBT_JSON: %s" % _cbs_sim_json_file stdout_logger(msg) - msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + \ - " (invalid json?) - FATAL ERROR, exiting" + msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting" try: tds.c_config = json.load(open(_cbs_sim_json_file)) except Exception as e: stdout_logger(msg) - cleanup_and_exit(0,None) + cleanup_and_exit(0, None) # recalc timeout, set default if not present try: - tds.timeout_seconds = tds.c_config['publisher.http_timeout_milliseconds'] / 1000.0 + tds.timeout_seconds = tds.c_config["publisher.http_timeout_milliseconds"] / 1000.0 except Exception as e: tds.timeout_seconds = 1.5 # recalc seconds_between_retries, set default if not present try: - tds.seconds_between_retries = tds.c_config['publisher.http_milliseconds_between_retries'] / 1000.0 + tds.seconds_between_retries = tds.c_config["publisher.http_milliseconds_between_retries"] / 1000.0 except Exception as e: - tds.seconds_between_retries = .750 + tds.seconds_between_retries = 0.750 # recalc min_severity_to_log, set default if not present try: - tds.minimum_severity_to_log = tds.c_config['files.minimum_severity_to_log'] + tds.minimum_severity_to_log = tds.c_config["files.minimum_severity_to_log"] except Exception as e: tds.minimum_severity_to_log = 3 try: - tds.publisher_retries = tds.c_config['publisher.http_retries'] + tds.publisher_retries = tds.c_config["publisher.http_retries"] except Exception as e: tds.publisher_retries = 3 diff --git a/miss_htbt_service/mod/trapd_http_session.py b/miss_htbt_service/mod/trapd_http_session.py index 0d7f220..fc7e865 100644 --- a/miss_htbt_service/mod/trapd_http_session.py +++ b/miss_htbt_service/mod/trapd_http_session.py @@ -1,7 +1,5 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,7 +19,7 @@ trapd_http_session establishes an http session for future use in publishing messages to the dmaap cluster. """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import os import requests diff --git a/miss_htbt_service/mod/trapd_io.py b/miss_htbt_service/mod/trapd_io.py index d60f060..3e03999 100644 --- a/miss_htbt_service/mod/trapd_io.py +++ b/miss_htbt_service/mod/trapd_io.py @@ -1,9 +1,7 @@ # ============LICENSE_START=======================================================) -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,7 +19,7 @@ """ """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" # basics import datetime @@ -36,6 +34,7 @@ import string import time import traceback import unicodedata + # dcae_snmptrap import mod.trapd_settings as tds from mod.trapd_exit import cleanup_and_exit @@ -48,7 +47,7 @@ prog_name = os.path.basename(__file__) # # # # # # # # # # ## # # # # # # # -#def roll_all_logs(): +# def roll_all_logs(): # """ # roll all active logs to timestamped version, open new one # based on frequency defined in files.roll_frequency @@ -107,7 +106,7 @@ prog_name = os.path.basename(__file__) # # # # # # # # # # ## # # # # # # # -#def open_eelf_logs(): +# def open_eelf_logs(): # """ # open various (multiple ???) logs # """ @@ -159,7 +158,7 @@ prog_name = os.path.basename(__file__) # # # # # # # # # # ## # # # # # # # -#def roll_file(_loc_file_name): +# def roll_file(_loc_file_name): # """ # move active file to timestamped archive # """ @@ -190,7 +189,7 @@ prog_name = os.path.basename(__file__) ## # # # # # # # # # # # # # # -#def open_file(_loc_file_name): +# def open_file(_loc_file_name): # """ # open _loc_file_name, return file handle # """ @@ -214,7 +213,7 @@ prog_name = os.path.basename(__file__) # """ # # -#def close_file(_loc_fd, _loc_filename): +# def close_file(_loc_fd, _loc_filename): # # try: # @@ -231,7 +230,7 @@ prog_name = os.path.basename(__file__) ## is released for python via LOG-161 ## # # # # # # # # # ## # # # # # # # # -#def ecomp_logger(_log_type, _sev, _error_code, _msg): +# def ecomp_logger(_log_type, _sev, _error_code, _msg): # """ # Log to ecomp-style logfiles. Logs include: # @@ -389,4 +388,4 @@ def stdout_logger(_msg): t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3] - print('%s %s' % (t_out, _msg)) + print("%s %s" % (t_out, _msg)) diff --git a/miss_htbt_service/mod/trapd_runtime_pid.py b/miss_htbt_service/mod/trapd_runtime_pid.py index d2db83d..47612bd 100644 --- a/miss_htbt_service/mod/trapd_runtime_pid.py +++ b/miss_htbt_service/mod/trapd_runtime_pid.py @@ -1,7 +1,5 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,7 +19,7 @@ trapd_runtime_pid maintains a 'PID file' (file that contains the PID of currently running trap receiver) """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import logging import os @@ -50,8 +48,8 @@ def save_pid(_pid_file_name): """ try: - pid_fd = open(_pid_file_name, 'w') - pid_fd.write('%d' % os.getpid()) + pid_fd = open(_pid_file_name, "w") + pid_fd.write("%d" % os.getpid()) pid_fd.close() except IOError: print("IOError saving PID file %s :" % _pid_file_name) diff --git a/miss_htbt_service/mod/trapd_settings.py b/miss_htbt_service/mod/trapd_settings.py index 0c8457f..3e7f141 100644 --- a/miss_htbt_service/mod/trapd_settings.py +++ b/miss_htbt_service/mod/trapd_settings.py @@ -1,7 +1,5 @@ # ============LICENSE_START=======================================================) -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +17,7 @@ """ """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" def init(): diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/trapd_vnf_table.py index db6bd17..1b00336 100644 --- a/miss_htbt_service/mod/trapd_vnf_table.py +++ b/miss_htbt_service/mod/trapd_vnf_table.py @@ -1,11 +1,9 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Samsung Electronics. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Samsung Electronics. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -44,45 +42,47 @@ _logger = logging.getLogger(__name__) def hb_properties(): - #Read the hbproperties.yaml for postgress and CBS related data - s=open(hb_properties_file, 'r') - a=yaml.full_load(s) - ip_address = a['pg_ipAddress'] - port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] - dbName = a['pg_dbName'] - db_name = dbName.lower() - cbs_polling_required = a['CBS_polling_allowed'] - cbs_polling_interval = a['CBS_polling_interval'] - s.close() - return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval - - -def verify_DB_creation_1(user_name,password,ip_address,port_num,db_name): - connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + # Read the hbproperties.yaml for postgress and CBS related data + s = open(hb_properties_file, "r") + a = yaml.full_load(s) + ip_address = a["pg_ipAddress"] + port_num = a["pg_portNum"] + user_name = a["pg_userName"] + password = a["pg_passwd"] + dbName = a["pg_dbName"] + db_name = dbName.lower() + cbs_polling_required = a["CBS_polling_allowed"] + cbs_polling_interval = a["CBS_polling_interval"] + s.close() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + + +def verify_DB_creation_1(user_name, password, ip_address, port_num, db_name): + connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) try: - _db_status=pm.db_table_creation_check(connection_db,"vnf_table_1") + _db_status = pm.db_table_creation_check(connection_db, "vnf_table_1") except Exception as e: return None return _db_status -def verify_DB_creation_2(user_name,password,ip_address,port_num,db_name): - connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) +def verify_DB_creation_2(user_name, password, ip_address, port_num, db_name): + + connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) try: - _db_status=pm.db_table_creation_check(connection_db,"vnf_table_2") + _db_status = pm.db_table_creation_check(connection_db, "vnf_table_2") except Exception as e: return None return _db_status -def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name): - connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) +def verify_DB_creation_hb_common(user_name, password, ip_address, port_num, db_name): + + connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) try: - _db_status=pm.db_table_creation_check(connection_db,"hb_common") + _db_status = pm.db_table_creation_check(connection_db, "hb_common") except Exception as e: return None @@ -90,35 +90,36 @@ def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name) def verify_cbspolling(): - os.environ['pytest']='test' - os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' + os.environ["pytest"] = "test" + os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static" cbs.poll_cbs(10) - os.unsetenv('pytest') - os.unsetenv('SERVICE_NAME') + os.unsetenv("pytest") + os.unsetenv("SERVICE_NAME") def verify_fetch_json_file(): - os.environ['pytest']='test' - os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' - os.environ['CONSUL_HOST']='localhost' - os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + os.environ["pytest"] = "test" + os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static" + os.environ["CONSUL_HOST"] = "localhost" + os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static" try: - db.fetch_json_file() - result = True + db.fetch_json_file() + result = True except Exception as e: - result = False + result = False print(result) - os.unsetenv('pytest') - os.unsetenv('SERVICE_NAME') - os.unsetenv('CONSUL_HOST') - os.unsetenv('HOSTNAME') + os.unsetenv("pytest") + os.unsetenv("SERVICE_NAME") + os.unsetenv("CONSUL_HOST") + os.unsetenv("HOSTNAME") return result + def verify_misshtbtdmain(): - os.environ['pytest']='test' - os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' - os.environ['CONSUL_HOST']='localhost' - os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + os.environ["pytest"] = "test" + os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static" + os.environ["CONSUL_HOST"] = "localhost" + os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static" try: db.main() @@ -126,69 +127,125 @@ def verify_misshtbtdmain(): except Exception as e: result = False print(result) - os.unsetenv('pytest') - os.unsetenv('SERVICE_NAME') - os.unsetenv('CONSUL_HOST') - os.unsetenv('HOSTNAME') + os.unsetenv("pytest") + os.unsetenv("SERVICE_NAME") + os.unsetenv("CONSUL_HOST") + os.unsetenv("HOSTNAME") return result + def verify_dbmonitoring(): - os.environ['pytest']='test' - os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' - os.environ['CONSUL_HOST']='localhost' - os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + os.environ["pytest"] = "test" + os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static" + os.environ["CONSUL_HOST"] = "localhost" + os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static" try: jsfile = db.fetch_json_file() ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties() - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) - dbmon.db_monitoring(hbc_pid,jsfile,user_name,password,ip_address,port_num,db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) + dbmon.db_monitoring(hbc_pid, jsfile, user_name, password, ip_address, port_num, db_name) result = True except Exception as e: print("Message process error - %s" % e) result = False print(result) - os.unsetenv('pytest') - os.unsetenv('SERVICE_NAME') - os.unsetenv('CONSUL_HOST') - os.unsetenv('HOSTNAME') + os.unsetenv("pytest") + os.unsetenv("SERVICE_NAME") + os.unsetenv("CONSUL_HOST") + os.unsetenv("HOSTNAME") return result + def verify_dbmon_startup(): try: - p = subprocess.Popen(['./miss_htbt_service/db_monitoring.py'], stdout=subprocess.PIPE,shell=True) + p = subprocess.Popen(["./miss_htbt_service/db_monitoring.py"], stdout=subprocess.PIPE, shell=True) time.sleep(1) except Exception as e: return None return True + def verify_sendControlLoop_VNF_ONSET(): try: pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" - _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + _CL_return = dbmon.sendControlLoopEvent( + "ONSET", + pol_url, + "1.0", + "vFireWall", + "pscope", + "VNF", + "srcname1", + 1541234567, + "SampleCLName", + "1.0", + "genVnfName", + ) except Exception as e: return None return _CL_return + def verify_sendControlLoop_VM_ONSET(): try: pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" - _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + _CL_return = dbmon.sendControlLoopEvent( + "ONSET", + pol_url, + "1.0", + "vFireWall", + "pscope", + "VM", + "srcname1", + 1541234567, + "SampleCLName", + "1.0", + "genVnfName", + ) except Exception as e: return None return _CL_return + def verify_sendControlLoop_VNF_ABATED(): try: pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" - _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + _CL_return = dbmon.sendControlLoopEvent( + "ABATED", + pol_url, + "1.0", + "vFireWall", + "pscope", + "VNF", + "srcname1", + 1541234567, + "SampleCLName", + "1.0", + "genVnfName", + ) except Exception as e: return None return _CL_return + def verify_sendControlLoop_VM_ABATED(): try: pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" - _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + _CL_return = dbmon.sendControlLoopEvent( + "ABATED", + pol_url, + "1.0", + "vFireWall", + "pscope", + "VM", + "srcname1", + 1541234567, + "SampleCLName", + "1.0", + "genVnfName", + ) except Exception as e: return None return _CL_return diff --git a/setup.py b/setup.py index 22af9ab..297d168 100644 --- a/setup.py +++ b/setup.py @@ -33,10 +33,10 @@ from setuptools import setup, find_packages ## from pip.download import PipSession setup( - name='miss_htbt_service', - description='Missing heartbeat microservice to communicate with policy-engine', - version='2.4.0', - #packages=find_packages(exclude=["tests.*", "tests"]), + name="miss_htbt_service", + description="Missing heartbeat microservice to communicate with policy-engine", + version="2.4.0", + # packages=find_packages(exclude=["tests.*", "tests"]), packages=find_packages(), install_requires=[ "requests==2.23.0", @@ -47,12 +47,12 @@ setup( "HTTPretty==1.0.5", "pyOpenSSL==20.0.1", "Wheel==0.36.2", - "psycopg2-binary==2.8.6" + "psycopg2-binary==2.8.6", ], - author = "Vijay Venkatesh Kumar", - author_email = "vv770d@att.com", - license = "", - keywords = "missing heartbeat microservice", - url = "https://gerrit.onap.org/r/#/admin/projects/dcaegen2/platform/heartbeat", + author="Vijay Venkatesh Kumar", + author_email="vv770d@att.com", + license="", + keywords="missing heartbeat microservice", + url="https://gerrit.onap.org/r/#/admin/projects/dcaegen2/platform/heartbeat", zip_safe=False, ) diff --git a/tests/monkey_psycopg2.py b/tests/monkey_psycopg2.py index 05f44b4..e993874 100644 --- a/tests/monkey_psycopg2.py +++ b/tests/monkey_psycopg2.py @@ -1,6 +1,5 @@ # ============LICENSE_START==================================================== -# ============================================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ============================================================================= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -34,14 +33,15 @@ FORCE_CLOSE_FAILURE = False # This variable is used by monkey_psycopg2.monkey_set_defaults(multiDbInfo) # to set up default values to be returned by cursors statements. -DEFAULT_MULTI_DBINFO = { } +DEFAULT_MULTI_DBINFO = {} + class MockConn(object): """ mock Connection interface returned by psycopg2.connect() """ - def __init__(self, dbInfo = None): + def __init__(self, dbInfo=None): self.curCmd = None self.curCursor = None self.monkey_setDbInfo(dbInfo) @@ -49,7 +49,7 @@ class MockConn(object): def monkey_setDbInfo(self, dbInfo): """ Set up a set of defaults for the cursors on "select" statements. - The outer scope is a string that will be matched against the currently-active + The outer scope is a string that will be matched against the currently-active select statement being executed. If there is a match, the specified values are returned by the cursor. dbconn.monkey_setDbInfo({ @@ -71,7 +71,7 @@ class MockConn(object): """ if FORCE_COMMIT_FAILURE: raise psycopg2.DatabaseError(f"Unable to commit: force_commit_failure=<{FORCE_COMMIT_FAILURE}>") - + def close(self): """ mock close @@ -110,7 +110,7 @@ class MockConn(object): self.curCursor = None return self - def execute(self, cmd, args = None): + def execute(self, cmd, args=None): """ mock execute @@ -155,6 +155,7 @@ class MockConn(object): print(f"fetchall() returning {self.curCursor}") return self.curCursor + def monkey_reset_forces(connect=False, cursor=False, execute=False, commit=False, close=False): print(f"monkey_reset_forces({connect}, {cursor}, {execute}, {commit}, {close})") global FORCE_CONNECT_FAILURE @@ -168,11 +169,12 @@ def monkey_reset_forces(connect=False, cursor=False, execute=False, commit=False global FORCE_CLOSE_FAILURE FORCE_CLOSE_FAILURE = close + def monkey_set_defaults(multiDbInfo): """ Set up a set of defaults for the cursors on "select" statements. The outer scope gives a database name. - The next level is a string that will be matched against the currently-active + The next level is a string that will be matched against the currently-active select statement being executed. If both match, the specified values are returned by the cursor. monkey_psycopg2.monkey_set_defaults({ @@ -187,10 +189,11 @@ def monkey_set_defaults(multiDbInfo): global DEFAULT_MULTI_DBINFO DEFAULT_MULTI_DBINFO = multiDbInfo + def monkey_connect(database=None, host=None, port=None, user=None, password=None): """ Mock psycopg2 connection. - Returns a mock connection. + Returns a mock connection. Will raise an exception if value of 'FORCE_CONNECT_FAILURE' is true. (Used to force failure for certain code paths.) @@ -199,7 +202,11 @@ def monkey_connect(database=None, host=None, port=None, user=None, password=None (See monkey_set_defaults(), which must have been called prior to this being invoked.) """ if FORCE_CONNECT_FAILURE: - raise Exception("Unable to connect to the database. password=<{}> force_connect_failure=<{}>".format(password, FORCE_CONNECT_FAILURE)) + raise Exception( + "Unable to connect to the database. password=<{}> force_connect_failure=<{}>".format( + password, FORCE_CONNECT_FAILURE + ) + ) if database in DEFAULT_MULTI_DBINFO: return MockConn(DEFAULT_MULTI_DBINFO[database]) diff --git a/tests/test_binding.py b/tests/test_binding.py index 6c8b0ef..5ed6532 100644 --- a/tests/test_binding.py +++ b/tests/test_binding.py @@ -1,8 +1,8 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2021 Samsung Electronics. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2021 Samsung Electronics. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,44 +24,85 @@ import httpretty import subprocess import json -MODULE_EXTENSIONS = ('.py', '.pyc', '.pyo') +MODULE_EXTENSIONS = (".py", ".pyc", ".pyo") ##### # MONKEYPATCHES ##### -mr_url = 'http://mrrouter.onap.org:3904' -intopic = 'VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT' -outopic = 'POLICY-HILOTCA-EVENT-OUTPUT' +mr_url = "http://mrrouter.onap.org:3904" +intopic = "VESCOLL-VNFNJ-SECHEARTBEAT-OUTPUT" +outopic = "POLICY-HILOTCA-EVENT-OUTPUT" @pytest.fixture() def disable_proxy(monkeypatch): - if 'http_proxy' in os.environ: - monkeypatch.delenv('http_proxy') - if 'HTTP_PROXY' in os.environ: - monkeypatch.delenv('HTTP_PROXY') + if "http_proxy" in os.environ: + monkeypatch.delenv("http_proxy") + if "HTTP_PROXY" in os.environ: + monkeypatch.delenv("HTTP_PROXY") @httpretty.activate def test_resolve_all(disable_proxy): htbtmsg = '{"event":{"commonEventHeader":{"startEpochMicrosec":1518616063564475,"sourceId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","eventId":"10048640","reportingEntityId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","priority":"Normal","version":3,"reportingEntityName":"TESTVM","sequence":10048640,"domain":"heartbeat","lastEpochMicrosec":1518616063564476,"eventName":"Heartbeat_vVnf","sourceName":"TESTVM","nfNamingCode":"vVNF"}}}' - send_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000' + send_url = mr_url + "/events/" + intopic + "/DefaultGroup/1?timeout=15000" print(send_url) httpretty.register_uri(httpretty.GET, send_url, body=htbtmsg) - #Use + # Use response = requests.get(send_url) print(response) print(response.text) - assert(response.text == htbtmsg) - htbtmsg = json.dumps({"event":{"commonEventHeader":{"startEpochMicrosec":1518616063564475,"sourceId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","eventId":"10048640","reportingEntityId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","priority":"Normal","version":3,"reportingEntityName":"TESTVM","sequence":10048640,"domain":"heartbeat","lastEpochMicrosec":1518616063564476,"eventName":"Heartbeat_vVnf","sourceName":"TESTVM","nfNamingCode":"vVNF"}}}) - send_url = mr_url+'/events/'+intopic+'/DefaultGroup/1?timeout=15000' - print("Send URL : "+send_url) + assert response.text == htbtmsg + htbtmsg = json.dumps( + { + "event": { + "commonEventHeader": { + "startEpochMicrosec": 1518616063564475, + "sourceId": "587c14b3-72c0-4581-b5cb-6567310b9bb7", + "eventId": "10048640", + "reportingEntityId": "587c14b3-72c0-4581-b5cb-6567310b9bb7", + "priority": "Normal", + "version": 3, + "reportingEntityName": "TESTVM", + "sequence": 10048640, + "domain": "heartbeat", + "lastEpochMicrosec": 1518616063564476, + "eventName": "Heartbeat_vVnf", + "sourceName": "TESTVM", + "nfNamingCode": "vVNF", + } + } + } + ) + send_url = mr_url + "/events/" + intopic + "/DefaultGroup/1?timeout=15000" + print("Send URL : " + send_url) httpretty.register_uri(httpretty.GET, send_url, body=htbtmsg, content_type="application/json") - pol_url = mr_url+'/events/'+outopic+'/DefaultGroup/1?timeout=15000' - pol_body = json.dumps({"event":{"commonEventHeader":{"startEpochMicrosec":1518616063564475,"sourceId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","eventId":"10048640","reportingEntityId":"587c14b3-72c0-4581-b5cb-6567310b9bb7","priority":"Normal","version":3,"reportingEntityName":"TESTVM","sequence":10048640,"domain":"heartbeat","lastEpochMicrosec":1518616063564476,"eventName":"Heartbeat_vVnf","sourceName":"TESTVM","nfNamingCode":"vVNF"}}}) - print("Policy URL : "+pol_url) - httpretty.register_uri(httpretty.POST, pol_url, body=pol_body, status=200, content_type='text/json') + pol_url = mr_url + "/events/" + outopic + "/DefaultGroup/1?timeout=15000" + pol_body = json.dumps( + { + "event": { + "commonEventHeader": { + "startEpochMicrosec": 1518616063564475, + "sourceId": "587c14b3-72c0-4581-b5cb-6567310b9bb7", + "eventId": "10048640", + "reportingEntityId": "587c14b3-72c0-4581-b5cb-6567310b9bb7", + "priority": "Normal", + "version": 3, + "reportingEntityName": "TESTVM", + "sequence": 10048640, + "domain": "heartbeat", + "lastEpochMicrosec": 1518616063564476, + "eventName": "Heartbeat_vVnf", + "sourceName": "TESTVM", + "nfNamingCode": "vVNF", + } + } + } + ) + print("Policy URL : " + pol_url) + httpretty.register_uri(httpretty.POST, pol_url, body=pol_body, status=200, content_type="text/json") + def test_full(): - p = subprocess.Popen(['./miss_htbt_service/misshtbtd.py'], stdout=subprocess.PIPE,shell=True) + p = subprocess.Popen(["./miss_htbt_service/misshtbtd.py"], stdout=subprocess.PIPE, shell=True) diff --git a/tests/test_check_health.py b/tests/test_check_health.py index d3088f9..d77beb1 100644 --- a/tests/test_check_health.py +++ b/tests/test_check_health.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,9 +22,11 @@ from miss_htbt_service import check_health import io + class MockSocket(object): def getsockname(self): - return 'sockname', + return ("sockname",) + class MockRequest(object): _sock = MockSocket() @@ -35,32 +37,40 @@ class MockRequest(object): self._body = body def makefile(self, *args, **kwargs): - if args[0] == 'rb': - if self._rqtype == 'GET': - return io.BytesIO(bytes("%s %s HTTP/1.0" % (self._rqtype, self._path), 'utf-8')) + if args[0] == "rb": + if self._rqtype == "GET": + return io.BytesIO(bytes("%s %s HTTP/1.0" % (self._rqtype, self._path), "utf-8")) else: - return io.BytesIO(bytes("%s %s HTTP/1.0\r\nContent-Length: %s\r\n\r\n%s" % (self._rqtype, self._path, len(self._body), self._body), 'utf-8')) - elif args[0] == 'wb': - return io.BytesIO(b'') + return io.BytesIO( + bytes( + "%s %s HTTP/1.0\r\nContent-Length: %s\r\n\r\n%s" + % (self._rqtype, self._path, len(self._body), self._body), + "utf-8", + ) + ) + elif args[0] == "wb": + return io.BytesIO(b"") else: raise ValueError("Unknown file type to make", args, kwargs) def sendall(self, bstr): pass + class MockServer(object): def __init__(self, rqtype, path, ip_port, Handler, body=None): handler = Handler(MockRequest(rqtype, path, body), ip_port, self) + def test_check_health_get(): """ test the check_health GET and POST handlers using a mock server """ - server = MockServer('GET', '/', ('0.0.0.0', 8888), check_health.GetHandler) + server = MockServer("GET", "/", ("0.0.0.0", 8888), check_health.GetHandler) + def test_check_health_post(): """ test the check_health GET and POST handlers using a mock server """ - server = MockServer('POST', '/', ('0.0.0.0', 8888), check_health.GetHandler, - '{ "health": "" }') + server = MockServer("POST", "/", ("0.0.0.0", 8888), check_health.GetHandler, '{ "health": "" }') diff --git a/tests/test_get_logger.py b/tests/test_get_logger.py index cbef9c6..a4ceea5 100644 --- a/tests/test_get_logger.py +++ b/tests/test_get_logger.py @@ -1,6 +1,6 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,20 +24,20 @@ log = logging.getLogger(__name__) def test_configure_logger(): - expected_log_path = Path('./hb_logs.txt') + expected_log_path = Path("./hb_logs.txt") if expected_log_path.exists(): os.remove(expected_log_path) - get_logger.configure_logger('') + get_logger.configure_logger("") log.info("hi there") assert expected_log_path.exists() os.remove(expected_log_path) def test_configure_logger_with_name(): - expected_log_path = Path('./hb_htbtworker_logs.txt') + expected_log_path = Path("./hb_htbtworker_logs.txt") if expected_log_path.exists(): os.remove(expected_log_path) - get_logger.configure_logger('htbtworker') + get_logger.configure_logger("htbtworker") log.info("hi there") assert expected_log_path.exists() os.remove(expected_log_path) diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py index 97e61b4..11cdc4a 100644 --- a/tests/test_htbtworker.py +++ b/tests/test_htbtworker.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ from miss_htbt_service import htbtworker import os, tempfile, json + def run_test(i): """ read_json_file() opens the file CWD/prefix/test{j}.json and returns the json value found there @@ -28,18 +29,20 @@ def run_test(i): pdir = f"{prefix}{tdir.name}" fname = f"{tdir.name}/test{j}.json" with open(fname, "w") as fp: - json.dump({ "test": i }, fp) - assert(os.path.isfile(f"{tdir.name}/test{j}.json")) - assert(os.path.isfile(f"{pdir}/test{j}.json")) + json.dump({"test": i}, fp) + assert os.path.isfile(f"{tdir.name}/test{j}.json") + assert os.path.isfile(f"{pdir}/test{j}.json") cfg = htbtworker.read_json_file(i, prefix=pdir) - assert(cfg["test"] == i) + assert cfg["test"] == i + def test_read_json_file_0(): run_test(0) + def test_read_json_file_1(): run_test(1) + def test_read_json_file_2(): run_test(2) - diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py index 35b7111..8803b29 100644 --- a/tests/test_trapd_exit.py +++ b/tests/test_trapd_exit.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,8 +20,9 @@ import pytest import unittest from miss_htbt_service.mod import trapd_exit -pid_file="/tmp/test_pid_file" -pid_file_dne="/tmp/test_pid_file_NOT" +pid_file = "/tmp/test_pid_file" +pid_file_dne = "/tmp/test_pid_file_NOT" + class test_cleanup_and_exit(unittest.TestCase): """ @@ -32,10 +33,10 @@ class test_cleanup_and_exit(unittest.TestCase): """ Test normal exit works as expected """ - open(pid_file, 'w') + open(pid_file, "w") with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - result = trapd_exit.cleanup_and_exit(0,pid_file) + result = trapd_exit.cleanup_and_exit(0, pid_file) assert pytest_wrapped_sys_exit.type == SystemExit assert pytest_wrapped_sys_exit.value.code == 0 @@ -44,6 +45,6 @@ class test_cleanup_and_exit(unittest.TestCase): Test exit with missing PID file exits non-zero """ with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - result = trapd_exit.cleanup_and_exit(0,pid_file_dne) + result = trapd_exit.cleanup_and_exit(0, pid_file_dne) assert pytest_wrapped_sys_exit.type == SystemExit assert pytest_wrapped_sys_exit.value.code == 1 diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py index 6cebef2..ffb9bfb 100644 --- a/tests/test_trapd_get_cbs_config.py +++ b/tests/test_trapd_get_cbs_config.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,19 +22,19 @@ import os from miss_htbt_service.mod import trapd_get_cbs_config + class test_get_cbs_config(unittest.TestCase): """ Test the trapd_get_cbs_config mod """ - pytest_json_data ="{ \"heartbeat_config\": { \"vnfs\": [{ \"eventName\": \"Heartbeat_vDNS\", \"heartbeatcountmissed\": 3, \"heartbeatinterval\": 60, \"closedLoopControlName\": \"ControlLoopEvent1\", \"policyVersion\": \"1.0.0.5\", \"policyName\": \"vFireWall\", \"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\", \"target_type\": \"VNF\", \"target\": \"genVnfName\", \"version\": \"1.0\" }, { \"eventName\": \"Heartbeat_vFW\", \"heartbeatcountmissed\": 3, \"heartbeatinterval\": 60, \"closedLoopControlName\": \"ControlLoopEvent1\", \"policyVersion\": \"1.0.0.5\", \"policyName\": \"vFireWall\", \"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\", \"target_type\": \"VNF\", \"target\": \"genVnfName\", \"version\": \"1.0\" }, { \"eventName\": \"Heartbeat_xx\", \"heartbeatcountmissed\": 3, \"heartbeatinterval\": 60, \"closedLoopControlName\": \"ControlLoopEvent1\", \"policyVersion\": \"1.0.0.5\", \"policyName\": \"vFireWall\", \"policyScope\": \"resource=sampleResource,type=sampletype,CLName=sampleCLName\", \"target_type\": \"VNF\", \"target\": \"genVnfName\", \"version\": \"1.0\" } ] }, \"streams_publishes\": { \"ves_heartbeat\": { \"dmaap_info\": { \"topic_url\": \"http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/\" }, \"type\": \"message_router\" } }, \"streams_subscribes\": { \"ves_heartbeat\": { \"dmaap_info\": { \"topic_url\": \"http://message-router:3904/events/unauthenticated.SEC_HEARTBEAT_INPUT/\" }, \"type\": \"message_router\" } } }" + pytest_json_data = '{ "heartbeat_config": { "vnfs": [{ "eventName": "Heartbeat_vDNS", "heartbeatcountmissed": 3, "heartbeatinterval": 60, "closedLoopControlName": "ControlLoopEvent1", "policyVersion": "1.0.0.5", "policyName": "vFireWall", "policyScope": "resource=sampleResource,type=sampletype,CLName=sampleCLName", "target_type": "VNF", "target": "genVnfName", "version": "1.0" }, { "eventName": "Heartbeat_vFW", "heartbeatcountmissed": 3, "heartbeatinterval": 60, "closedLoopControlName": "ControlLoopEvent1", "policyVersion": "1.0.0.5", "policyName": "vFireWall", "policyScope": "resource=sampleResource,type=sampletype,CLName=sampleCLName", "target_type": "VNF", "target": "genVnfName", "version": "1.0" }, { "eventName": "Heartbeat_xx", "heartbeatcountmissed": 3, "heartbeatinterval": 60, "closedLoopControlName": "ControlLoopEvent1", "policyVersion": "1.0.0.5", "policyName": "vFireWall", "policyScope": "resource=sampleResource,type=sampletype,CLName=sampleCLName", "target_type": "VNF", "target": "genVnfName", "version": "1.0" } ] }, "streams_publishes": { "ves_heartbeat": { "dmaap_info": { "topic_url": "http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/" }, "type": "message_router" } }, "streams_subscribes": { "ves_heartbeat": { "dmaap_info": { "topic_url": "http://message-router:3904/events/unauthenticated.SEC_HEARTBEAT_INPUT/" }, "type": "message_router" } } }' # create copy of snmptrapd.json for pytest pytest_json_config = "/tmp/opt/app/miss_htbt_service/etc/config.json" - with open(pytest_json_config, 'w') as outfile: + with open(pytest_json_config, "w") as outfile: outfile.write(pytest_json_data) - def test_cbs_env_present(self): """ Test that CONSUL_HOST env variable exists but fails to @@ -45,13 +45,12 @@ class test_get_cbs_config(unittest.TestCase): result = trapd_get_cbs_config.get_cbs_config() assert pytest_wrapped_sys_exit.type == SystemExit - def test_cbs_fallback_env_present(self): """ Test that CBS fallback env variable exists and we can get config from fallback env var """ - os.environ.update(CBS_HTBT_JSON='/tmp/opt/app/miss_htbt_service/etc/config.json') + os.environ.update(CBS_HTBT_JSON="/tmp/opt/app/miss_htbt_service/etc/config.json") result = True print("result: %s" % result) self.assertEqual(result, True) diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py index 47380fc..070fc93 100644 --- a/tests/test_trapd_http_session.py +++ b/tests/test_trapd_http_session.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import unittest from miss_htbt_service.mod import trapd_http_session + class test_init_session_obj(unittest.TestCase): """ Test the init_session_obj mod diff --git a/tests/test_trapd_runtime_pid.py b/tests/test_trapd_runtime_pid.py index f31c4db..47bc642 100644 --- a/tests/test_trapd_runtime_pid.py +++ b/tests/test_trapd_runtime_pid.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import unittest from miss_htbt_service.mod import trapd_runtime_pid + class test_save_pid(unittest.TestCase): """ Test the save_pid mod @@ -28,16 +29,17 @@ class test_save_pid(unittest.TestCase): """ Test that attempt to create pid file in standard location works """ - result = trapd_runtime_pid.save_pid('/tmp/snmptrap_test_pid_file') + result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file") self.assertEqual(result, True) def test_missing_directory(self): """ Test that attempt to create pid file in missing dir fails """ - result = trapd_runtime_pid.save_pid('/bogus/directory/for/snmptrap_test_pid_file') + result = trapd_runtime_pid.save_pid("/bogus/directory/for/snmptrap_test_pid_file") self.assertEqual(result, False) + class test_rm_pid(unittest.TestCase): """ Test the rm_pid mod @@ -48,14 +50,14 @@ class test_rm_pid(unittest.TestCase): Test that attempt to remove pid file in standard location works """ # must create it before removing it - result = trapd_runtime_pid.save_pid('/tmp/snmptrap_test_pid_file') + result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file") self.assertEqual(result, True) - result = trapd_runtime_pid.rm_pid('/tmp/snmptrap_test_pid_file') + result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file") self.assertEqual(result, True) def test_missing_file(self): """ Test that attempt to rm non-existent pid file fails """ - result = trapd_runtime_pid.rm_pid('/tmp/snmptrap_test_pid_file_9999') + result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file_9999") self.assertEqual(result, False) diff --git a/tests/test_trapd_settings.py b/tests/test_trapd_settings.py index d800119..f3aebdd 100644 --- a/tests/test_trapd_settings.py +++ b/tests/test_trapd_settings.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,8 +20,8 @@ import unittest from miss_htbt_service.mod import trapd_settings as tds -pid_file="/tmp/test_pid_file" -pid_file_dne="/tmp/test_pid_file_NOT" +pid_file = "/tmp/test_pid_file" +pid_file_dne = "/tmp/test_pid_file_NOT" class test_cleanup_and_exit(unittest.TestCase): @@ -29,7 +29,6 @@ class test_cleanup_and_exit(unittest.TestCase): Test for presense of required vars """ - def test_nonexistent_dict(self): """ Test nosuch var diff --git a/tests/test_trapd_vnf_table.py b/tests/test_trapd_vnf_table.py index e8ef5d0..b88b4d5 100644 --- a/tests/test_trapd_vnf_table.py +++ b/tests/test_trapd_vnf_table.py @@ -1,10 +1,8 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,12 +24,20 @@ test_trapd_vnf_table contains test cases related to DB Tables and cbs polling. import logging import unittest from mod.trapd_vnf_table import ( - verify_DB_creation_1, verify_DB_creation_2, verify_DB_creation_hb_common, - hb_properties, verify_cbspolling, - verify_sendControlLoop_VNF_ONSET, verify_sendControlLoop_VM_ONSET, - verify_sendControlLoop_VNF_ABATED, verify_sendControlLoop_VM_ABATED, - verify_fetch_json_file, verify_misshtbtdmain, verify_dbmonitoring, - verify_dbmon_startup) + verify_DB_creation_1, + verify_DB_creation_2, + verify_DB_creation_hb_common, + hb_properties, + verify_cbspolling, + verify_sendControlLoop_VNF_ONSET, + verify_sendControlLoop_VM_ONSET, + verify_sendControlLoop_VNF_ABATED, + verify_sendControlLoop_VM_ABATED, + verify_fetch_json_file, + verify_misshtbtdmain, + verify_dbmonitoring, + verify_dbmon_startup, +) _logger = logging.getLogger(__name__) @@ -40,19 +46,20 @@ class test_vnf_tables(unittest.TestCase): """ Test the DB Creation """ + global ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties() def test_validate_vnf_table_1(self): - result =verify_DB_creation_1(user_name,password,ip_address,port_num,db_name) + result = verify_DB_creation_1(user_name, password, ip_address, port_num, db_name) self.assertEqual(result, True) def test_validate_vnf_table_2(self): - result =verify_DB_creation_2(user_name,password,ip_address,port_num,db_name) + result = verify_DB_creation_2(user_name, password, ip_address, port_num, db_name) self.assertEqual(result, True) def test_validate_hb_common(self): - result =verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name) + result = verify_DB_creation_hb_common(user_name, password, ip_address, port_num, db_name) self.assertEqual(result, True) def test_cbspolling(self): @@ -60,41 +67,41 @@ class test_vnf_tables(unittest.TestCase): verify_cbspolling() def test_fetch_json_file(self): - result= verify_fetch_json_file() + result = verify_fetch_json_file() _logger.info(result) self.assertEqual(result, True) def test_misshtbtdmain(self): - result= verify_misshtbtdmain() + result = verify_misshtbtdmain() _logger.info(result) self.assertEqual(result, True) def test_dbmon_startup(self): - result= verify_dbmon_startup() + result = verify_dbmon_startup() _logger.info(result) self.assertEqual(result, True) def test_dbmonitoring(self): - result= verify_dbmonitoring() + result = verify_dbmonitoring() _logger.info(result) self.assertEqual(result, True) def test_sendControlLoop_VNF_ONSET(self): - result= verify_sendControlLoop_VNF_ONSET() + result = verify_sendControlLoop_VNF_ONSET() _logger.info(result) self.assertEqual(result, True) def test_sendControlLoop_VM_ONSET(self): - result= verify_sendControlLoop_VM_ONSET() + result = verify_sendControlLoop_VM_ONSET() _logger.info(result) self.assertEqual(result, True) def test_sendControlLoop_VNF_ABATED(self): - result= verify_sendControlLoop_VNF_ABATED() + result = verify_sendControlLoop_VNF_ABATED() _logger.info(result) self.assertEqual(result, True) def test_sendControlLoop_VM_ABATED(self): - result= verify_sendControlLoop_VM_ABATED() + result = verify_sendControlLoop_VM_ABATED() _logger.info(result) self.assertEqual(result, True) -- cgit 1.2.3-korg