diff options
author | Hansen, Tony (th1395) <th1395@att.com> | 2021-12-01 22:01:56 +0000 |
---|---|---|
committer | Hansen, Tony (th1395) <th1395@att.com> | 2021-12-02 19:58:31 +0000 |
commit | 2108563705a2ec8bb80029d36122c69fa4d06df5 (patch) | |
tree | 1453b42bc3535635b136d0963a290243e26ae35f /miss_htbt_service | |
parent | 8d7c0201456b7f9af6e91fea90354f4c3de323fe (diff) |
run the black formatting tool on python code
also fix up some copyright & license block lines
Change-Id: Ifb628e2ef1e5f13fed0a29964eec387d3982d605
Signed-off-by: Hansen, Tony (th1395) <th1395@att.com>
Issue-ID: DCAEGEN2-2995
Signed-off-by: Hansen, Tony (th1395) <th1395@att.com>
Diffstat (limited to 'miss_htbt_service')
-rw-r--r-- | miss_htbt_service/__init__.py | 5 | ||||
-rw-r--r-- | miss_htbt_service/cbs_polling.py | 24 | ||||
-rw-r--r-- | miss_htbt_service/check_health.py | 47 | ||||
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 243 | ||||
-rw-r--r-- | miss_htbt_service/get_logger.py | 10 | ||||
-rw-r--r-- | miss_htbt_service/htbtworker.py | 114 | ||||
-rw-r--r-- | miss_htbt_service/misshtbtd.py | 257 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_exit.py | 11 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_get_cbs_config.py | 32 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_http_session.py | 6 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_io.py | 23 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_runtime_pid.py | 10 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_settings.py | 6 | ||||
-rw-r--r-- | miss_htbt_service/mod/trapd_vnf_table.py | 193 |
14 files changed, 601 insertions, 380 deletions
diff --git a/miss_htbt_service/__init__.py b/miss_htbt_service/__init__.py index 1ae08ca..a56d536 100644 --- a/miss_htbt_service/__init__.py +++ b/miss_htbt_service/__init__.py @@ -1,5 +1,5 @@ -# ================================================================================ -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# ============LICENSE_START======================================================= +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,4 +16,3 @@ # empty __init__.py so that pytest can add correct path to coverage report, -- per pytest # best practice guideline - diff --git a/miss_htbt_service/cbs_polling.py b/miss_htbt_service/cbs_polling.py index e7bdef1..53585ba 100644 --- a/miss_htbt_service/cbs_polling.py +++ b/miss_htbt_service/cbs_polling.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,7 +35,15 @@ _logger = logging.getLogger(__name__) def poll_cbs(current_pid: int) -> None: jsfile = db.fetch_json_file() - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(jsfile) hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) msg = "CBSP:Main process ID in hb_common is %d", hbc_pid _logger.info(msg) @@ -43,13 +51,13 @@ def poll_cbs(current_pid: int) -> None: _logger.info(msg) msg = "CBSP:CBS Polling interval is %d", cbs_polling_interval _logger.info(msg) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": cbs_polling_interval = "30" time.sleep(int(cbs_polling_interval)) hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) if current_pid == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING": _logger.info("CBSP:ACTIVE Instance:Change the state to RECONFIGURATION") state = "RECONFIGURATION" @@ -60,7 +68,7 @@ def poll_cbs(current_pid: int) -> None: def cbs_polling_loop(current_pid: int): - get_logger.configure_logger('cbs_polling') + get_logger.configure_logger("cbs_polling") while True: poll_cbs(current_pid) diff --git a/miss_htbt_service/check_health.py b/miss_htbt_service/check_health.py index 4d04210..71f7c58 100644 --- a/miss_htbt_service/check_health.py +++ b/miss_htbt_service/check_health.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,38 +22,39 @@ from urllib import parse class GetHandler(BaseHTTPRequestHandler): - def do_GET(self): parsed_path = parse.urlparse(self.path) - message = '\n'.join([ - 'CLIENT VALUES:', - 'client_address=%s (%s)' % (self.client_address, self.address_string()), - 'command=%s' % self.command, - 'path=%s' % self.path, - 'real path=%s' % parsed_path.path, - 'query=%s' % parsed_path.query, - 'request_version=%s' % self.request_version, - '', - 'SERVER VALUES:', - 'server_version=%s' % self.server_version, - 'sys_version=%s' % self.sys_version, - 'protocol_version=%s' % self.protocol_version, - '', - ]) + message = "\n".join( + [ + "CLIENT VALUES:", + "client_address=%s (%s)" % (self.client_address, self.address_string()), + "command=%s" % self.command, + "path=%s" % self.path, + "real path=%s" % parsed_path.path, + "query=%s" % parsed_path.query, + "request_version=%s" % self.request_version, + "", + "SERVER VALUES:", + "server_version=%s" % self.server_version, + "sys_version=%s" % self.sys_version, + "protocol_version=%s" % self.protocol_version, + "", + ] + ) self.send_response(200) self.end_headers() - self.wfile.write(bytes(message, 'utf-8')) + self.wfile.write(bytes(message, "utf-8")) return def do_POST(self): - content_len = int(self.headers.get('content-length', 0)) + content_len = int(self.headers.get("content-length", 0)) post_body = self.rfile.read(content_len) self.send_response(200) self.end_headers() data = json.loads(post_body) - self.wfile.write(bytes(data['health'], 'utf-8')) + self.wfile.write(bytes(data["health"], "utf-8")) return @@ -61,9 +62,9 @@ def start_health_check_server() -> None: from http.server import HTTPServer server = HTTPServer(("", 10002), GetHandler) - print('Starting server at http://localhost:10002') + print("Starting server at http://localhost:10002") server.serve_forever() -if __name__ == '__main__': +if __name__ == "__main__": start_health_check_server() diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index a405876..32a2d5c 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,81 +37,100 @@ import get_logger _logger = logging.getLogger(__name__) -def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time, - closed_control_loop_name, version, target): +def sendControlLoopEvent( + CLType, + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + srcName, + epoc_time, + closed_control_loop_name, + version, + target, +): msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type _logger.info(msg) if CLType == "ONSET": _logger.info("DBM:Heartbeat not received, raising alarm event") if target_type == "VNF": - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": {"generic-vnf.vnf-name": srcName}, - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }) + json_object = json.dumps( + { + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"generic-vnf.vnf-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE", + } + ) elif target_type == "VM": - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": {"vserver.vserver-name": srcName}, - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ONSET", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }) + json_object = json.dumps( + { + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"vserver.vserver-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ONSET", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE", + } + ) else: return True elif CLType == "ABATED": _logger.info("DBM:Heartbeat received, clearing alarm event") # last_date_time = datetime.datetime.now() if target_type == "VNF": - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": {"generic-vnf.vnf-name": srcName}, - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ABATED", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }) + json_object = json.dumps( + { + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"generic-vnf.vnf-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE", + } + ) elif target_type == "VM": - json_object = json.dumps({ - "closedLoopEventClient": "DCAE_Heartbeat_MS", - "policyVersion": policy_version, - "policyName": policy_name, - "policyScope": policy_scope, - "target_type": target_type, - "AAI": {"vserver.vserver-name": srcName}, - "closedLoopAlarmStart": epoc_time, - "closedLoopEventStatus": "ABATED", - "closedLoopControlName": closed_control_loop_name, - "version": version, - "target": target, - "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", - "from": "DCAE" - }) + json_object = json.dumps( + { + "closedLoopEventClient": "DCAE_Heartbeat_MS", + "policyVersion": policy_version, + "policyName": policy_name, + "policyScope": policy_scope, + "target_type": target_type, + "AAI": {"vserver.vserver-name": srcName}, + "closedLoopAlarmStart": epoc_time, + "closedLoopEventStatus": "ABATED", + "closedLoopControlName": closed_control_loop_name, + "version": version, + "target": target, + "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc", + "from": "DCAE", + } + ) else: return True else: @@ -131,7 +150,7 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_sc msg = "DBM:Status code for sending the control loop event is", ret _logger.info(msg) except Exception as err: - msg = 'Message send failure : ', err + msg = "Message send failure : ", err _logger.error(msg) return True @@ -140,22 +159,24 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ while True: time.sleep(20) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": break try: - with open(json_file, 'r') as outfile: + with open(json_file, "r") as outfile: cfg = json.load(outfile) - pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url']) + pol_url = str(cfg["streams_publishes"]["dcae_cl_out"]["dmaap_info"]["topic_url"]) except Exception as err: - msg = 'Json file process error : ', err + msg = "Json file process error : ", err _logger.error(msg) continue - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() if int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING": @@ -170,9 +191,12 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ _logger.info("DBM:Waiting for hb_common state to become RUNNING") break - cur.execute("SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " - "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " - "target, version FROM vnf_table_1 WHERE event_name = %s", (event_name,)) + cur.execute( + "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " + "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " + "target, version FROM vnf_table_1 WHERE event_name = %s", + (event_name,), + ) rows = cur.fetchall() validity_flag = rows[0][0] source_name_count = rows[0][1] @@ -189,8 +213,11 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ if validity_flag == 1: for source_name_key in range(source_name_count): epoc_time = int(round(time.time() * 1000)) - cur.execute("SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " - "event_name = %s AND source_name_key = %s", (event_name, (source_name_key + 1))) + cur.execute( + "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " + "event_name = %s AND source_name_key = %s", + (event_name, (source_name_key + 1)), + ) row = cur.fetchall() if len(row) == 0: continue @@ -198,20 +225,44 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ srcName = row[0][1] cl_flag = row[0][2] if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: - sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope, - target_type, srcName, epoc_time, closed_control_loop_name, version, - target) + sendControlLoopEvent( + "ONSET", + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + srcName, + epoc_time, + closed_control_loop_name, + version, + target, + ) cl_flag = 1 - cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " - "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1))) + cur.execute( + "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", + (cl_flag, event_name, (source_name_key + 1)), + ) connection_db.commit() elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: - sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope, - target_type, srcName, epoc_time, closed_control_loop_name, version, - target) + sendControlLoopEvent( + "ABATED", + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + srcName, + epoc_time, + closed_control_loop_name, + version, + target, + ) cl_flag = 0 - cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " - "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1))) + cur.execute( + "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", + (cl_flag, event_name, (source_name_key + 1)), + ) connection_db.commit() else: # pragma: no cover @@ -233,15 +284,23 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ if __name__ == "__main__": - get_logger.configure_logger('db_monitoring') + get_logger.configure_logger("db_monitoring") _logger.info("DBM: DBM Process started") current_pid = sys.argv[1] jsfile = sys.argv[2] - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(jsfile) msg = "DBM:Parent process ID and json file name", current_pid, jsfile _logger.info(msg) while True: db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": break diff --git a/miss_htbt_service/get_logger.py b/miss_htbt_service/get_logger.py index 55286eb..8068f6b 100644 --- a/miss_htbt_service/get_logger.py +++ b/miss_htbt_service/get_logger.py @@ -1,6 +1,6 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ import logging.handlers LOG_LEVEL = logging.DEBUG -LOG_FORMAT = '%(asctime)s | %(levelname)5s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(message)s' +LOG_FORMAT = "%(asctime)s | %(levelname)5s | %(name)s | %(module)s | %(funcName)s | %(lineno)d | %(message)s" LOG_MAXSIZE = 10485760 * 5 LOG_BACKUP_COUNT = 10 @@ -39,9 +39,9 @@ def configure_logger(proc_name: str) -> None: # Add rotating log file handler if proc_name: - logfile_path = './hb_%s_logs.txt' % proc_name + logfile_path = "./hb_%s_logs.txt" % proc_name else: - logfile_path = './hb_logs.txt' + logfile_path = "./hb_logs.txt" fhandler = logging.handlers.RotatingFileHandler(logfile_path, maxBytes=LOG_MAXSIZE, backupCount=LOG_BACKUP_COUNT) fhandler.setFormatter(formatter) root.addHandler(fhandler) diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index be1b6aa..44436a2 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ def read_json_file(i, prefix="../../tests"): with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: cfg = json.load(outfile) elif i == 2: - with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile: + with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile: cfg = json.load(outfile) return cfg @@ -56,19 +56,21 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): sleep_duration = 20 while True: time.sleep(sleep_duration) - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) - mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) + mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"]) while True: - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) if hbc_state == "RECONFIGURATION": _logger.info("HBT:Waiting for hb_common state to become RUNNING") time.sleep(10) else: break - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] connection_db = 0 else: @@ -79,13 +81,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): msg = "\n\nHBT:eventnameList values ", eventnameList _logger.info(msg) if "groupID" not in os.environ or "consumerID" not in os.environ: - get_url = mr_url + '/DefaultGroup/1?timeout=15000' + get_url = mr_url + "/DefaultGroup/1?timeout=15000" else: - get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000' + get_url = mr_url + "/" + os.getenv("groupID", "") + "/" + os.getenv("consumerID", "") + "?timeout=15000" msg = "HBT:Getting :" + get_url _logger.info(msg) - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": jsonobj = read_json_file(i) jobj = [] jobj.append(jsonobj) @@ -103,14 +105,14 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): # If mrstatus in message body indicates some information, not json msg. if "mrstatus" in inputString: continue - jlist = inputString.split('\n') + jlist = inputString.split("\n") # Process the DMaaP input message retreived error = False for line in jlist: try: jobj = json.loads(line) except ValueError: - msg = 'HBT:Decoding JSON has failed' + msg = "HBT:Decoding JSON has failed" _logger.error(msg) error = True break @@ -120,17 +122,17 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): continue for item in jobj: try: - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": jitem = jsonobj else: jitem = json.loads(item) - srcname = (jitem['event']['commonEventHeader']['sourceName']) - lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec']) + srcname = jitem["event"]["commonEventHeader"]["sourceName"] + lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] # if lastEpochMicrosec looks like microsec, align it with millisec if lastepo > 1000000000000000: lastepo = int(lastepo / 1000) - seqnum = (jitem['event']['commonEventHeader']['sequence']) - eventName = (jitem['event']['commonEventHeader']['eventName']) + seqnum = jitem["event"]["commonEventHeader"]["sequence"] + eventName = jitem["event"]["commonEventHeader"]["eventName"] except Exception as err: msg = "HBT message process error - ", err _logger.error(msg) @@ -140,7 +142,8 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if db_table_creation_check(connection_db, "vnf_table_2") is False: msg = "HBT:Creating vnf_table_2" _logger.info(msg) - cur.execute(""" + cur.execute( + """ CREATE TABLE vnf_table_2 ( EVENT_NAME varchar, SOURCE_NAME_KEY integer, @@ -148,12 +151,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer - )""") + )""" + ) else: msg = "HBT:vnf_table_2 is already there" _logger.info(msg) if eventName in eventnameList: # pragma: no cover - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": break cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (eventName,)) row = cur.fetchone() @@ -162,16 +166,22 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): cl_flag = 0 if source_name_count == 0: # pragma: no cover _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (eventName, source_name_key, lastepo, srcname, cl_flag)) - cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", - (source_name_key, eventName)) + cur.execute( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + (eventName, source_name_key, lastepo, srcname, cl_flag), + ) + cur.execute( + "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", + (source_name_key, eventName), + ) else: # pragma: no cover msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count _logger.info(msg) for source_name_key in range(source_name_count): - cur.execute("SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " - "source_name_key = %s", (eventName, (source_name_key + 1))) + cur.execute( + "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s", + (eventName, (source_name_key + 1)), + ) row = cur.fetchall() if len(row) == 0: continue @@ -179,9 +189,11 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if db_srcname == srcname: msg = "HBT: Update vnf_table_2 : ", source_name_key, row _logger.info(msg) - cur.execute("UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " - "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", - (lastepo, srcname, eventName, (source_name_key + 1))) + cur.execute( + "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " + "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", + (lastepo, srcname, eventName, (source_name_key + 1)), + ) source_name_key = source_name_count break else: @@ -191,27 +203,31 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if source_name_count == (source_name_key + 1): source_name_key = source_name_count + 1 _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (eventName, source_name_key, lastepo, srcname, cl_flag)) - cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", - (source_name_key, eventName)) + cur.execute( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + (eventName, source_name_key, lastepo, srcname, cl_flag), + ) + cur.execute( + "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", + (source_name_key, eventName), + ) else: _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") commit_db(connection_db) commit_and_close_db(connection_db) - if os.getenv('pytest', "") != 'test': + if os.getenv("pytest", "") != "test": cur.close() def postgres_db_open(username, password, host, port, database_name): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port) return connection def db_table_creation_check(connection_db, table_name): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: cur = connection_db.cursor() @@ -223,43 +239,51 @@ def db_table_creation_check(connection_db, table_name): else: return False except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) finally: cur.close() def commit_db(connection_db): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: connection_db.commit() # <--- makes sure the change is shown in the database return True except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) return False def commit_and_close_db(connection_db): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() return True except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) return False -if __name__ == '__main__': - get_logger.configure_logger('htbtworker') +if __name__ == "__main__": + get_logger.configure_logger("htbtworker") jsfile = sys.argv[1] msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") msg = "HBT:The config file name passed is -%s", jsfile _logger.info(msg) - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(jsfile) process_msg(jsfile, user_name, password, ip_address, port_num, db_name) diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 6be2260..5ba0860 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -2,9 +2,9 @@ # ============LICENSE_START======================================================= # Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Samsung Electronics. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Samsung Electronics. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -59,6 +59,7 @@ CONFIG_PATH = "../etc/config.json" def create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name): from psycopg2 import connect + try: con = connect(user=user_name, host=ip_address, password=password) database_name = db_name @@ -82,8 +83,8 @@ def create_database(update_db, jsfile, ip_address, port_num, user_name, password def read_hb_common(user_name, password, ip_address, port_num, db_name): - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": hbc_pid = 10 hbc_srcName = "srvc_name" hbc_time = 1584595881 @@ -105,41 +106,47 @@ def read_hb_common(user_name, password, ip_address, port_num, db_name): def create_update_hb_common(update_flg, process_id, state, user_name, password, ip_address, port_num, db_name): current_time = int(round(time.time())) source_name = socket.gethostname() - source_name = source_name + "-" + os.getenv('SERVICE_NAME', "") - envPytest = os.getenv('pytest', "") - if envPytest != 'test': + source_name = source_name + "-" + os.getenv("SERVICE_NAME", "") + envPytest = os.getenv("pytest", "") + if envPytest != "test": connection_db = heartbeat.postgres_db_open(user_name, password, ip_address, port_num, db_name) cur = connection_db.cursor() if heartbeat.db_table_creation_check(connection_db, "hb_common") is False: - cur.execute(""" + cur.execute( + """ CREATE TABLE hb_common ( PROCESS_ID integer primary key, SOURCE_NAME varchar, LAST_ACCESSED_TIME integer, CURRENT_STATE varchar - )""") + )""" + ) cur.execute("INSERT INTO hb_common VALUES(%s, %s, %s, %s)", (process_id, source_name, current_time, state)) _logger.info("MSHBT:Created hb_common DB and updated new values") elif update_flg == 1: - cur.execute("UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s " - "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", (current_time, state, process_id, source_name)) + cur.execute( + "UPDATE hb_common SET LAST_ACCESSED_TIME = %s, CURRENT_STATE = %s " + "WHERE PROCESS_ID = %s AND SOURCE_NAME = %s", + (current_time, state, process_id, source_name), + ) _logger.info("MSHBT:Updated hb_common DB with new values") heartbeat.commit_and_close_db(connection_db) cur.close() def create_update_vnf_table_1(jsfile, update_db, connection_db): - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) - hbcfg = cfg['heartbeat_config'] + hbcfg = cfg["heartbeat_config"] jhbcfg = json.loads(hbcfg) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + envPytest = os.getenv("pytest", "") + if envPytest == "test": vnf_list = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] else: cur = connection_db.cursor() if heartbeat.db_table_creation_check(connection_db, "vnf_table_1") is False: - cur.execute(""" + cur.execute( + """ CREATE TABLE vnf_table_1 ( EVENT_NAME varchar primary key, HEARTBEAT_MISSED_COUNT integer, @@ -153,7 +160,8 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): VERSION varchar, SOURCE_NAME_COUNT integer, VALIDITY_FLAG integer - )""") + )""" + ) _logger.info("MSHBT:Created vnf_table_1 table") if update_db == 1: cur.execute("UPDATE vnf_table_1 SET VALIDITY_FLAG=0 WHERE VALIDITY_FLAG=1") @@ -161,35 +169,62 @@ def create_update_vnf_table_1(jsfile, update_db, connection_db): # Put some initial values into the queue cur.execute("SELECT event_name FROM vnf_table_1") vnf_list = [item[0] for item in cur.fetchall()] - for vnf in (jhbcfg['vnfs']): - nfc = vnf['eventName'] + for vnf in jhbcfg["vnfs"]: + nfc = vnf["eventName"] validity_flag = 1 source_name_count = 0 - missed = vnf['heartbeatcountmissed'] - intvl = vnf['heartbeatinterval'] - clloop = vnf['closedLoopControlName'] - policyVersion = vnf['policyVersion'] - policyName = vnf['policyName'] - policyScope = vnf['policyScope'] - target_type = vnf['target_type'] - target = vnf['target'] - version = vnf['version'] - - if envPytest == 'test': + missed = vnf["heartbeatcountmissed"] + intvl = vnf["heartbeatinterval"] + clloop = vnf["closedLoopControlName"] + policyVersion = vnf["policyVersion"] + policyName = vnf["policyName"] + policyScope = vnf["policyScope"] + target_type = vnf["target_type"] + target = vnf["target"] + version = vnf["version"] + + if envPytest == "test": # skip executing SQL in test continue if nfc not in vnf_list: - cur.execute("INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", - (nfc, missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, - version, source_name_count, validity_flag)) + cur.execute( + "INSERT INTO vnf_table_1 VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + ( + nfc, + missed, + intvl, + clloop, + policyVersion, + policyName, + policyScope, + target_type, + target, + version, + source_name_count, + validity_flag, + ), + ) _logger.debug("Inserted new event_name = %s into vnf_table_1", nfc) else: - cur.execute("""UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s, + cur.execute( + """UPDATE vnf_table_1 SET HEARTBEAT_MISSED_COUNT = %s, HEARTBEAT_INTERVAL = %s, CLOSED_CONTROL_LOOP_NAME = %s, POLICY_VERSION = %s, POLICY_NAME = %s, POLICY_SCOPE = %s, TARGET_TYPE = %s, TARGET = %s, VERSION = %s, VALIDITY_FLAG = %s where EVENT_NAME = %s""", - (missed, intvl, clloop, policyVersion, policyName, policyScope, target_type, target, version, - validity_flag, nfc)) - if envPytest != 'test': + ( + missed, + intvl, + clloop, + policyVersion, + policyName, + policyScope, + target_type, + target, + version, + validity_flag, + nfc, + ), + ) + if envPytest != "test": cur.close() _logger.info("MSHBT:Updated vnf_table_1 as per the json configuration file") @@ -210,31 +245,36 @@ def db_monitoring_process(current_pid, jsfile): def read_hb_properties_default(): # Read the hbproperties.yaml for postgress and CBS related data - s = open(hb_properties_file, 'r') + s = open(hb_properties_file, "r") a = yaml.full_load(s) - if (os.getenv('pg_ipAddress') is None) or (os.getenv('pg_portNum') is None) or (os.getenv('pg_userName') is None) or (os.getenv('pg_passwd') is None): - ip_address = a['pg_ipAddress'] - port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] + if ( + (os.getenv("pg_ipAddress") is None) + or (os.getenv("pg_portNum") is None) + or (os.getenv("pg_userName") is None) + or (os.getenv("pg_passwd") is None) + ): + ip_address = a["pg_ipAddress"] + port_num = a["pg_portNum"] + user_name = a["pg_userName"] + password = a["pg_passwd"] else: - ip_address = os.getenv('pg_ipAddress') - port_num = os.getenv('pg_portNum') - user_name = os.getenv('pg_userName') - password = os.getenv('pg_passwd') + ip_address = os.getenv("pg_ipAddress") + port_num = os.getenv("pg_portNum") + user_name = os.getenv("pg_userName") + password = os.getenv("pg_passwd") - dbName = a['pg_dbName'] + dbName = a["pg_dbName"] db_name = dbName.lower() - cbs_polling_required = a['CBS_polling_allowed'] - cbs_polling_interval = a['CBS_polling_interval'] + cbs_polling_required = a["CBS_polling_allowed"] + cbs_polling_interval = a["CBS_polling_interval"] s.close() return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval def read_hb_properties(jsfile): try: - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) except Exception as err: msg = "CBS Json file load error - " + str(err) @@ -242,20 +282,20 @@ def read_hb_properties(jsfile): return read_hb_properties_default() try: - ip_address = str(cfg['pg_ipAddress']) - port_num = str(cfg['pg_portNum']) - user_name = str(cfg['pg_userName']) - password = str(cfg['pg_passwd']) - dbName = str(cfg['pg_dbName']) + ip_address = str(cfg["pg_ipAddress"]) + port_num = str(cfg["pg_portNum"]) + user_name = str(cfg["pg_userName"]) + password = str(cfg["pg_passwd"]) + dbName = str(cfg["pg_dbName"]) db_name = dbName.lower() - cbs_polling_required = str(cfg['CBS_polling_allowed']) - cbs_polling_interval = str(cfg['CBS_polling_interval']) - consumer_id = str(cfg['consumerID']) - group_id = str(cfg['groupID']) - os.environ['consumerID'] = consumer_id - os.environ['groupID'] = group_id + cbs_polling_required = str(cfg["CBS_polling_allowed"]) + cbs_polling_interval = str(cfg["CBS_polling_interval"]) + consumer_id = str(cfg["consumerID"]) + group_id = str(cfg["groupID"]) + os.environ["consumerID"] = consumer_id + os.environ["groupID"] = group_id if "SERVICE_NAME" in cfg: - os.environ['SERVICE_NAME'] = str(cfg['SERVICE_NAME']) + os.environ["SERVICE_NAME"] = str(cfg["SERVICE_NAME"]) except Exception as err: msg = "CBS Json file read parameter error - " + str(err) _logger.error(msg) @@ -273,7 +313,7 @@ def fetch_json_file() -> str: # Try to get config from CBS. If succeeded, config json is stored to tds.c_config . if get_cbs_config(): # Save config to temporary file - with tempfile.NamedTemporaryFile('w', delete=False) as temp: + with tempfile.NamedTemporaryFile("w", delete=False) as temp: _logger.info("MSHBD: New config saved to temp file %s", temp.name) json.dump(tds.c_config, temp) # Swap current config with downloaded config @@ -289,8 +329,8 @@ def fetch_json_file() -> str: def create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name): - envPytest = os.getenv('pytest', "") - if envPytest != 'test': # pragma: no cover + envPytest = os.getenv("pytest", "") + if envPytest != "test": # pragma: no cover if update_db == 0: create_database(update_db, jsfile, ip_address, port_num, user_name, password, db_name) msg = "MSHBT: DB parameters -", ip_address, port_num, user_name, password, db_name @@ -310,7 +350,13 @@ def create_process(job_list, jsfile, pid_current): if len(job_list) == 0: p1 = multiprocessing.Process(target=hb_worker_process, args=(jsfile,)) time.sleep(1) - p2 = multiprocessing.Process(target=db_monitoring_process, args=(pid_current, jsfile,)) + p2 = multiprocessing.Process( + target=db_monitoring_process, + args=( + pid_current, + jsfile, + ), + ) p1.start() time.sleep(1) p2.start() @@ -322,7 +368,7 @@ def create_process(job_list, jsfile, pid_current): def main(): - get_logger.configure_logger('misshtbtd') + get_logger.configure_logger("misshtbtd") pid_current = os.getpid() hc_proc = multiprocessing.Process(target=check_health.start_health_check_server) cbs_polling_proc = multiprocessing.Process(target=cbs_polling.cbs_polling_loop, args=(pid_current,)) @@ -334,15 +380,32 @@ def main(): job_list = [] jsfile = fetch_json_file() - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = read_hb_properties(jsfile) - msg = "MSHBT:HB Properties -", ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = read_hb_properties(jsfile) + msg = ( + "MSHBT:HB Properties -", + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) _logger.info(msg) update_db = 0 create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) state = "RECONFIGURATION" update_flg = 0 create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) - if cbs_polling_required == 'True': + if cbs_polling_required == "True": # note: cbs_polling process must be started after `hb_common` table created cbs_polling_proc.start() _logger.info("MSHBD: Started CBS polling process. PID=%d", cbs_polling_proc.pid) @@ -350,17 +413,27 @@ def main(): _logger.info("MSHBD:Now be in a continuous loop") i = 0 while True: - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common( + user_name, password, ip_address, port_num, db_name + ) msg = "MSHBT: hb_common values ", hbc_pid, hbc_state, hbc_srcName, hbc_time _logger.info(msg) current_time = int(round(time.time())) time_difference = current_time - hbc_time - msg = "MSHBD:pid,srcName,state,time,ctime,timeDiff is", hbc_pid, hbc_srcName, hbc_state, hbc_time, current_time, time_difference + msg = ( + "MSHBD:pid,srcName,state,time,ctime,timeDiff is", + hbc_pid, + hbc_srcName, + hbc_state, + hbc_time, + current_time, + time_difference, + ) _logger.info(msg) source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', "")) - envPytest = os.getenv('pytest', "") - if envPytest == 'test': + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) + envPytest = os.getenv("pytest", "") + if envPytest == "test": if i == 2: hbc_pid = pid_current source_name = hbc_srcName @@ -376,9 +449,13 @@ def main(): if hbc_state == "RUNNING": state = "RUNNING" update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name + ) elif hbc_state == "RECONFIGURATION": - _logger.info("MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes") + _logger.info( + "MSHBD:Reconfiguration is in progress,Starting new processes by killing the present processes" + ) jsfile = fetch_json_file() update_db = 1 create_update_db(update_db, jsfile, ip_address, port_num, user_name, password, db_name) @@ -387,7 +464,9 @@ def main(): job_list = create_process(job_list, jsfile, pid_current) state = "RUNNING" update_flg = 1 - create_update_hb_common(update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, state, user_name, password, ip_address, port_num, db_name + ) else: _logger.info("MSHBD:Inactive Instance: Process IDs are different, Keep Looping") @@ -395,7 +474,11 @@ def main(): _logger.info("MSHBD:Inactive Instance: Main and DBM thread are waiting to become ACTIVE") else: jsfile = fetch_json_file() - msg = "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", jsfile, pid_current + msg = ( + "MSHBD:Inactive Instance:Creating HB and DBM threads if not created already. The param pssed %d and %s", + jsfile, + pid_current, + ) _logger.info(msg) job_list = create_process(job_list, jsfile, pid_current) else: @@ -409,13 +492,17 @@ def main(): msg = "MSHBD: Creating HB and DBM threads. The param pssed %d and %s", jsfile, pid_current _logger.info(msg) job_list = create_process(job_list, jsfile, pid_current) - hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = read_hb_common( + user_name, password, ip_address, port_num, db_name + ) update_flg = 1 - create_update_hb_common(update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name) + create_update_hb_common( + update_flg, pid_current, hbc_state, user_name, password, ip_address, port_num, db_name + ) else: _logger.error("MSHBD:ERROR - Active instance is not updating hb_common in 60 sec - ERROR") time.sleep(25) - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": i = i + 1 if i > 5: _logger.info("Terminating main process for pytest") @@ -451,5 +538,5 @@ def main(): cbs_polling_proc.join() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/miss_htbt_service/mod/trapd_exit.py b/miss_htbt_service/mod/trapd_exit.py index c741fe5..aae12bb 100644 --- a/miss_htbt_service/mod/trapd_exit.py +++ b/miss_htbt_service/mod/trapd_exit.py @@ -1,9 +1,7 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,7 +21,7 @@ trapc_exit_snmptrapd is responsible for removing any existing runtime PID file, and exiting with the provided (param 1) exit code """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import sys import os @@ -62,6 +60,7 @@ def cleanup_and_exit(_loc_exit_code, _pid_file_name): rc = rm_pid(_pid_file_name) sys.exit(_loc_exit_code) + # # # # # # # # # # # # # # fx: cleanup_and_exit # - remove pid file @@ -89,5 +88,3 @@ def cleanup(_loc_exit_code, _pid_file_name): if _pid_file_name is not None: rc = rm_pid(_pid_file_name) - - diff --git a/miss_htbt_service/mod/trapd_get_cbs_config.py b/miss_htbt_service/mod/trapd_get_cbs_config.py index 70dac53..f9fd4c3 100644 --- a/miss_htbt_service/mod/trapd_get_cbs_config.py +++ b/miss_htbt_service/mod/trapd_get_cbs_config.py @@ -1,10 +1,8 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ # Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Samsung Electronics. All rights reserved. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Samsung Electronics. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,7 +23,7 @@ env variable that specifies JSON equiv of CBS config (typically used for testing purposes) """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import json import os @@ -36,7 +34,7 @@ import traceback import collections.abc from onap_dcae_cbs_docker_client.client import get_config from mod import trapd_settings as tds -from mod.trapd_exit import cleanup,cleanup_and_exit +from mod.trapd_exit import cleanup, cleanup_and_exit from mod.trapd_io import stdout_logger prog_name = os.path.basename(__file__) @@ -75,46 +73,44 @@ def get_cbs_config(): _cbs_sim_json_file = os.getenv("CBS_HTBT_JSON", "None") except Exception as e: stdout_logger(msg) - cleanup(1,None) + cleanup(1, None) return False msg = "CBS_HTBT_JSON not defined - FATAL ERROR, exiting" if _cbs_sim_json_file == "None": stdout_logger(msg) - cleanup(1,None) + cleanup(1, None) return False else: - msg = ("ONAP controller override specified via CBS_HTBT_JSON: %s" % - _cbs_sim_json_file) + msg = "ONAP controller override specified via CBS_HTBT_JSON: %s" % _cbs_sim_json_file stdout_logger(msg) - msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + \ - " (invalid json?) - FATAL ERROR, exiting" + msg = "Unable to load CBS_HTBT_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting" try: tds.c_config = json.load(open(_cbs_sim_json_file)) except Exception as e: stdout_logger(msg) - cleanup_and_exit(0,None) + cleanup_and_exit(0, None) # recalc timeout, set default if not present try: - tds.timeout_seconds = tds.c_config['publisher.http_timeout_milliseconds'] / 1000.0 + tds.timeout_seconds = tds.c_config["publisher.http_timeout_milliseconds"] / 1000.0 except Exception as e: tds.timeout_seconds = 1.5 # recalc seconds_between_retries, set default if not present try: - tds.seconds_between_retries = tds.c_config['publisher.http_milliseconds_between_retries'] / 1000.0 + tds.seconds_between_retries = tds.c_config["publisher.http_milliseconds_between_retries"] / 1000.0 except Exception as e: - tds.seconds_between_retries = .750 + tds.seconds_between_retries = 0.750 # recalc min_severity_to_log, set default if not present try: - tds.minimum_severity_to_log = tds.c_config['files.minimum_severity_to_log'] + tds.minimum_severity_to_log = tds.c_config["files.minimum_severity_to_log"] except Exception as e: tds.minimum_severity_to_log = 3 try: - tds.publisher_retries = tds.c_config['publisher.http_retries'] + tds.publisher_retries = tds.c_config["publisher.http_retries"] except Exception as e: tds.publisher_retries = 3 diff --git a/miss_htbt_service/mod/trapd_http_session.py b/miss_htbt_service/mod/trapd_http_session.py index 0d7f220..fc7e865 100644 --- a/miss_htbt_service/mod/trapd_http_session.py +++ b/miss_htbt_service/mod/trapd_http_session.py @@ -1,7 +1,5 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,7 +19,7 @@ trapd_http_session establishes an http session for future use in publishing messages to the dmaap cluster. """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import os import requests diff --git a/miss_htbt_service/mod/trapd_io.py b/miss_htbt_service/mod/trapd_io.py index d60f060..3e03999 100644 --- a/miss_htbt_service/mod/trapd_io.py +++ b/miss_htbt_service/mod/trapd_io.py @@ -1,9 +1,7 @@ # ============LICENSE_START=======================================================) -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,7 +19,7 @@ """ """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" # basics import datetime @@ -36,6 +34,7 @@ import string import time import traceback import unicodedata + # dcae_snmptrap import mod.trapd_settings as tds from mod.trapd_exit import cleanup_and_exit @@ -48,7 +47,7 @@ prog_name = os.path.basename(__file__) # # # # # # # # # # ## # # # # # # # -#def roll_all_logs(): +# def roll_all_logs(): # """ # roll all active logs to timestamped version, open new one # based on frequency defined in files.roll_frequency @@ -107,7 +106,7 @@ prog_name = os.path.basename(__file__) # # # # # # # # # # ## # # # # # # # -#def open_eelf_logs(): +# def open_eelf_logs(): # """ # open various (multiple ???) logs # """ @@ -159,7 +158,7 @@ prog_name = os.path.basename(__file__) # # # # # # # # # # ## # # # # # # # -#def roll_file(_loc_file_name): +# def roll_file(_loc_file_name): # """ # move active file to timestamped archive # """ @@ -190,7 +189,7 @@ prog_name = os.path.basename(__file__) ## # # # # # # # # # # # # # # -#def open_file(_loc_file_name): +# def open_file(_loc_file_name): # """ # open _loc_file_name, return file handle # """ @@ -214,7 +213,7 @@ prog_name = os.path.basename(__file__) # """ # # -#def close_file(_loc_fd, _loc_filename): +# def close_file(_loc_fd, _loc_filename): # # try: # @@ -231,7 +230,7 @@ prog_name = os.path.basename(__file__) ## is released for python via LOG-161 ## # # # # # # # # # ## # # # # # # # # -#def ecomp_logger(_log_type, _sev, _error_code, _msg): +# def ecomp_logger(_log_type, _sev, _error_code, _msg): # """ # Log to ecomp-style logfiles. Logs include: # @@ -389,4 +388,4 @@ def stdout_logger(_msg): t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3] - print('%s %s' % (t_out, _msg)) + print("%s %s" % (t_out, _msg)) diff --git a/miss_htbt_service/mod/trapd_runtime_pid.py b/miss_htbt_service/mod/trapd_runtime_pid.py index d2db83d..47612bd 100644 --- a/miss_htbt_service/mod/trapd_runtime_pid.py +++ b/miss_htbt_service/mod/trapd_runtime_pid.py @@ -1,7 +1,5 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,7 +19,7 @@ trapd_runtime_pid maintains a 'PID file' (file that contains the PID of currently running trap receiver) """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import logging import os @@ -50,8 +48,8 @@ def save_pid(_pid_file_name): """ try: - pid_fd = open(_pid_file_name, 'w') - pid_fd.write('%d' % os.getpid()) + pid_fd = open(_pid_file_name, "w") + pid_fd.write("%d" % os.getpid()) pid_fd.close() except IOError: print("IOError saving PID file %s :" % _pid_file_name) diff --git a/miss_htbt_service/mod/trapd_settings.py b/miss_htbt_service/mod/trapd_settings.py index 0c8457f..3e7f141 100644 --- a/miss_htbt_service/mod/trapd_settings.py +++ b/miss_htbt_service/mod/trapd_settings.py @@ -1,7 +1,5 @@ # ============LICENSE_START=======================================================) -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +17,7 @@ """ """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" def init(): diff --git a/miss_htbt_service/mod/trapd_vnf_table.py b/miss_htbt_service/mod/trapd_vnf_table.py index db6bd17..1b00336 100644 --- a/miss_htbt_service/mod/trapd_vnf_table.py +++ b/miss_htbt_service/mod/trapd_vnf_table.py @@ -1,11 +1,9 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Samsung Electronics. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Samsung Electronics. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -44,45 +42,47 @@ _logger = logging.getLogger(__name__) def hb_properties(): - #Read the hbproperties.yaml for postgress and CBS related data - s=open(hb_properties_file, 'r') - a=yaml.full_load(s) - ip_address = a['pg_ipAddress'] - port_num = a['pg_portNum'] - user_name = a['pg_userName'] - password = a['pg_passwd'] - dbName = a['pg_dbName'] - db_name = dbName.lower() - cbs_polling_required = a['CBS_polling_allowed'] - cbs_polling_interval = a['CBS_polling_interval'] - s.close() - return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval - - -def verify_DB_creation_1(user_name,password,ip_address,port_num,db_name): - connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) + # Read the hbproperties.yaml for postgress and CBS related data + s = open(hb_properties_file, "r") + a = yaml.full_load(s) + ip_address = a["pg_ipAddress"] + port_num = a["pg_portNum"] + user_name = a["pg_userName"] + password = a["pg_passwd"] + dbName = a["pg_dbName"] + db_name = dbName.lower() + cbs_polling_required = a["CBS_polling_allowed"] + cbs_polling_interval = a["CBS_polling_interval"] + s.close() + return ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval + + +def verify_DB_creation_1(user_name, password, ip_address, port_num, db_name): + connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) try: - _db_status=pm.db_table_creation_check(connection_db,"vnf_table_1") + _db_status = pm.db_table_creation_check(connection_db, "vnf_table_1") except Exception as e: return None return _db_status -def verify_DB_creation_2(user_name,password,ip_address,port_num,db_name): - connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) +def verify_DB_creation_2(user_name, password, ip_address, port_num, db_name): + + connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) try: - _db_status=pm.db_table_creation_check(connection_db,"vnf_table_2") + _db_status = pm.db_table_creation_check(connection_db, "vnf_table_2") except Exception as e: return None return _db_status -def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name): - connection_db = pm.postgres_db_open(user_name,password,ip_address,port_num,db_name) +def verify_DB_creation_hb_common(user_name, password, ip_address, port_num, db_name): + + connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name) try: - _db_status=pm.db_table_creation_check(connection_db,"hb_common") + _db_status = pm.db_table_creation_check(connection_db, "hb_common") except Exception as e: return None @@ -90,35 +90,36 @@ def verify_DB_creation_hb_common(user_name,password,ip_address,port_num,db_name) def verify_cbspolling(): - os.environ['pytest']='test' - os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' + os.environ["pytest"] = "test" + os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static" cbs.poll_cbs(10) - os.unsetenv('pytest') - os.unsetenv('SERVICE_NAME') + os.unsetenv("pytest") + os.unsetenv("SERVICE_NAME") def verify_fetch_json_file(): - os.environ['pytest']='test' - os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' - os.environ['CONSUL_HOST']='localhost' - os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + os.environ["pytest"] = "test" + os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static" + os.environ["CONSUL_HOST"] = "localhost" + os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static" try: - db.fetch_json_file() - result = True + db.fetch_json_file() + result = True except Exception as e: - result = False + result = False print(result) - os.unsetenv('pytest') - os.unsetenv('SERVICE_NAME') - os.unsetenv('CONSUL_HOST') - os.unsetenv('HOSTNAME') + os.unsetenv("pytest") + os.unsetenv("SERVICE_NAME") + os.unsetenv("CONSUL_HOST") + os.unsetenv("HOSTNAME") return result + def verify_misshtbtdmain(): - os.environ['pytest']='test' - os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' - os.environ['CONSUL_HOST']='localhost' - os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + os.environ["pytest"] = "test" + os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static" + os.environ["CONSUL_HOST"] = "localhost" + os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static" try: db.main() @@ -126,69 +127,125 @@ def verify_misshtbtdmain(): except Exception as e: result = False print(result) - os.unsetenv('pytest') - os.unsetenv('SERVICE_NAME') - os.unsetenv('CONSUL_HOST') - os.unsetenv('HOSTNAME') + os.unsetenv("pytest") + os.unsetenv("SERVICE_NAME") + os.unsetenv("CONSUL_HOST") + os.unsetenv("HOSTNAME") return result + def verify_dbmonitoring(): - os.environ['pytest']='test' - os.environ['SERVICE_NAME']='mvp-dcaegen2-heartbeat-static' - os.environ['CONSUL_HOST']='localhost' - os.environ['HOSTNAME']='mvp-dcaegen2-heartbeat-static' + os.environ["pytest"] = "test" + os.environ["SERVICE_NAME"] = "mvp-dcaegen2-heartbeat-static" + os.environ["CONSUL_HOST"] = "localhost" + os.environ["HOSTNAME"] = "mvp-dcaegen2-heartbeat-static" try: jsfile = db.fetch_json_file() ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties() - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name,password,ip_address,port_num,db_name) - dbmon.db_monitoring(hbc_pid,jsfile,user_name,password,ip_address,port_num,db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) + dbmon.db_monitoring(hbc_pid, jsfile, user_name, password, ip_address, port_num, db_name) result = True except Exception as e: print("Message process error - %s" % e) result = False print(result) - os.unsetenv('pytest') - os.unsetenv('SERVICE_NAME') - os.unsetenv('CONSUL_HOST') - os.unsetenv('HOSTNAME') + os.unsetenv("pytest") + os.unsetenv("SERVICE_NAME") + os.unsetenv("CONSUL_HOST") + os.unsetenv("HOSTNAME") return result + def verify_dbmon_startup(): try: - p = subprocess.Popen(['./miss_htbt_service/db_monitoring.py'], stdout=subprocess.PIPE,shell=True) + p = subprocess.Popen(["./miss_htbt_service/db_monitoring.py"], stdout=subprocess.PIPE, shell=True) time.sleep(1) except Exception as e: return None return True + def verify_sendControlLoop_VNF_ONSET(): try: pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" - _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + _CL_return = dbmon.sendControlLoopEvent( + "ONSET", + pol_url, + "1.0", + "vFireWall", + "pscope", + "VNF", + "srcname1", + 1541234567, + "SampleCLName", + "1.0", + "genVnfName", + ) except Exception as e: return None return _CL_return + def verify_sendControlLoop_VM_ONSET(): try: pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" - _CL_return = dbmon.sendControlLoopEvent("ONSET", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + _CL_return = dbmon.sendControlLoopEvent( + "ONSET", + pol_url, + "1.0", + "vFireWall", + "pscope", + "VM", + "srcname1", + 1541234567, + "SampleCLName", + "1.0", + "genVnfName", + ) except Exception as e: return None return _CL_return + def verify_sendControlLoop_VNF_ABATED(): try: pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" - _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VNF", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + _CL_return = dbmon.sendControlLoopEvent( + "ABATED", + pol_url, + "1.0", + "vFireWall", + "pscope", + "VNF", + "srcname1", + 1541234567, + "SampleCLName", + "1.0", + "genVnfName", + ) except Exception as e: return None return _CL_return + def verify_sendControlLoop_VM_ABATED(): try: pol_url = "http://10.12.5.252:3904/events/unauthenticated.DCAE_CL_OUTPUT/" - _CL_return = dbmon.sendControlLoopEvent("ABATED", pol_url, "1.0", "vFireWall", "pscope", "VM", "srcname1", 1541234567, "SampleCLName", "1.0", "genVnfName") + _CL_return = dbmon.sendControlLoopEvent( + "ABATED", + pol_url, + "1.0", + "vFireWall", + "pscope", + "VM", + "srcname1", + 1541234567, + "SampleCLName", + "1.0", + "genVnfName", + ) except Exception as e: return None return _CL_return |