From cf7f98a554e4b594fd531729908694bb4b211442 Mon Sep 17 00:00:00 2001 From: "Hansen, Tony (th1395)" Date: Thu, 2 Dec 2021 20:42:43 +0000 Subject: run the black formatting tool on python code Change-Id: I41badbf7ea2b2cd243e9f2acd5c9f2d3ec907964 Issue-ID: DCAEGEN2-2995 Signed-off-by: Hansen, Tony (th1395) --- Changelog.md | 4 +- setup.py | 30 +-- snmptrap/mod/trapd_exit.py | 4 +- snmptrap/mod/trapd_get_cbs_config.py | 23 +- snmptrap/mod/trapd_http_session.py | 7 +- snmptrap/mod/trapd_io.py | 171 ++++++++---- snmptrap/mod/trapd_runtime_pid.py | 8 +- snmptrap/mod/trapd_settings.py | 4 +- snmptrap/mod/trapd_snmpv3.py | 76 +++--- snmptrap/mod/trapd_stats_settings.py | 4 +- snmptrap/mod/trapd_stormwatch.py | 199 +++++++------- snmptrap/mod/trapd_stormwatch_settings.py | 4 +- snmptrap/mod/trapd_vb_types.py | 31 ++- snmptrap/snmptrapd.py | 432 ++++++++++++++++-------------- tests/__init__.py | 6 +- tests/snmp.setup.py | 26 +- tests/test_snmptrapd.py | 80 ++++-- tests/test_snmptrapd_send_test_trap.py | 45 ++-- tests/test_trapd_exit.py | 24 +- tests/test_trapd_get_cbs_config.py | 39 ++- tests/test_trapd_http_session.py | 20 +- tests/test_trapd_io.py | 51 ++-- tests/test_trapd_runtime_pid.py | 32 +-- tests/test_trapd_settings.py | 19 +- tests/test_trapd_snmpv3.py | 18 +- tests/test_trapd_stormwatch.py | 21 +- tests/test_trapd_stormwatch_settings.py | 17 +- tests/test_trapd_vb_types.py | 19 +- 28 files changed, 775 insertions(+), 639 deletions(-) diff --git a/Changelog.md b/Changelog.md index a6126b5..78b3a8a 100644 --- a/Changelog.md +++ b/Changelog.md @@ -5,7 +5,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [2.0.6] - 2021/10/26 -* Changed [DCAEGEN2-2957] SNMP Trap collector - STDOUT complaince +### Changed +* [DCAEGEN2-2957] SNMP Trap collector - STDOUT complaince +* [DCAEGEN2-2995] run the black formatting tool on python code ## [2.0.5] - 2021/07/19 * Changed to use version 2.2.1 of pypi onap_dcae_cbs_docker_client diff --git a/setup.py b/setup.py index b1fb070..8716437 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,6 @@ -# org.onap.dcae -# ================================================================================ +# ============LICENSE_START======================================================= # Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. -# Copyright 2021 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Deutsche Telekom. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. import os import string @@ -26,19 +23,14 @@ from setuptools import setup, find_packages setup( - name = "snmptrap", - description = "snmp trap receiver for ONAP docker image", - version = "2.0.6", + name="snmptrap", + description="snmp trap receiver for ONAP docker image", + version="2.0.6", packages=find_packages(), - install_requires=[ - "pysnmp==4.4.12", - "requests==2.18.3", - "onap_dcae_cbs_docker_client==2.2.1", - "pyyaml" - ], - author = "Dave L", - author_email = "dl3158@att.com", - license='Apache 2', - keywords = "", - url = "" + install_requires=["pysnmp==4.4.12", "requests==2.18.3", "onap_dcae_cbs_docker_client==2.2.1", "pyyaml"], + author="Dave L", + author_email="dl3158@att.com", + license="Apache 2", + keywords="", + url="", ) diff --git a/snmptrap/mod/trapd_exit.py b/snmptrap/mod/trapd_exit.py index 6d1ea45..1d1ea16 100644 --- a/snmptrap/mod/trapd_exit.py +++ b/snmptrap/mod/trapd_exit.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ trapc_exit_snmptrapd is responsible for removing any existing runtime PID file, and exiting with the provided (param 1) exit code """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import sys import os diff --git a/snmptrap/mod/trapd_get_cbs_config.py b/snmptrap/mod/trapd_get_cbs_config.py index 272aabe..cfae994 100644 --- a/snmptrap/mod/trapd_get_cbs_config.py +++ b/snmptrap/mod/trapd_get_cbs_config.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ env variable that specifies JSON equiv of CBS config (typically used for testing purposes) """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import json import os @@ -76,17 +76,14 @@ def get_cbs_config(): stdout_logger(msg) cleanup_and_exit(1, None) else: - msg = ("ONAP controller override specified via CBS_SIM_JSON: %s" % - _cbs_sim_json_file) + msg = "ONAP controller override specified via CBS_SIM_JSON: %s" % _cbs_sim_json_file stdout_logger(msg) try: tds.c_config = json.load(open(_cbs_sim_json_file)) - msg = ("%s loaded and parsed successfully" % - _cbs_sim_json_file) + msg = "%s loaded and parsed successfully" % _cbs_sim_json_file stdout_logger(msg) except Exception as e: - msg = "Unable to load CBS_SIM_JSON " + _cbs_sim_json_file + \ - " (invalid json?) - FATAL ERROR, exiting" + msg = "Unable to load CBS_SIM_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting" stdout_logger(msg) cleanup_and_exit(1, None) @@ -96,24 +93,24 @@ def get_cbs_config(): # recalc timeout, set default if not present try: - tds.timeout_seconds = float(tds.c_config['publisher']['http_milliseconds_timeout'] / 1000.0) + tds.timeout_seconds = float(tds.c_config["publisher"]["http_milliseconds_timeout"] / 1000.0) except Exception as e: tds.timeout_seconds = float(1.5) # recalc seconds_between_retries, set default if not present try: - tds.seconds_between_retries = float(tds.c_config['publisher']['http_milliseconds_between_retries'] / 1000.0) + tds.seconds_between_retries = float(tds.c_config["publisher"]["http_milliseconds_between_retries"] / 1000.0) except Exception as e: - tds.seconds_between_retries = float(.750) + tds.seconds_between_retries = float(0.750) # recalc min_severity_to_log, set default if not present try: - tds.minimum_severity_to_log = int(tds.c_config['files']['minimum_severity_to_log']) + tds.minimum_severity_to_log = int(tds.c_config["files"]["minimum_severity_to_log"]) except Exception as e: tds.minimum_severity_to_log = int(3) try: - tds.publisher_retries = int(tds.c_config['publisher']['http_retries']) + tds.publisher_retries = int(tds.c_config["publisher"]["http_retries"]) except Exception as e: tds.publisher_retries = int(2) diff --git a/snmptrap/mod/trapd_http_session.py b/snmptrap/mod/trapd_http_session.py index 14abb21..7118d00 100644 --- a/snmptrap/mod/trapd_http_session.py +++ b/snmptrap/mod/trapd_http_session.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ trapd_http_session establishes an http session for future use in publishing messages to the dmaap cluster. """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import os import requests @@ -81,8 +81,7 @@ def close_session_obj(_loc_http_requ_session): return True except Exception as e: msg = "Unable to close current http session - FATAL ERROR, exiting" - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, - tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) diff --git a/snmptrap/mod/trapd_io.py b/snmptrap/mod/trapd_io.py index 20b99f9..f7c48b7 100644 --- a/snmptrap/mod/trapd_io.py +++ b/snmptrap/mod/trapd_io.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ """ """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" # basics import datetime @@ -54,8 +54,14 @@ def roll_all_logs(): # NOTE: this will go away when onap logging is standardized/available try: # open various ecomp logs - if any fails, exit - for fd in [tds.eelf_error_fd, tds.eelf_debug_fd, tds.eelf_audit_fd, - tds.eelf_metrics_fd, tds.arriving_traps_fd, tds.json_traps_fd]: + for fd in [ + tds.eelf_error_fd, + tds.eelf_debug_fd, + tds.eelf_audit_fd, + tds.eelf_metrics_fd, + tds.arriving_traps_fd, + tds.json_traps_fd, + ]: fd.close() roll_file(tds.eelf_error_file_name) @@ -80,8 +86,7 @@ def roll_all_logs(): try: tds.json_traps_fd = open_file(tds.json_traps_filename) except Exception as e: - msg = ("Error opening json_log %s : %s" % - (json_traps_filename, str(e))) + msg = "Error opening json_log %s : %s" % (json_traps_filename, str(e)) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -91,8 +96,7 @@ def roll_all_logs(): try: tds.arriving_traps_fd = open_file(tds.arriving_traps_filename) except Exception as e: - msg = ("Error opening arriving traps %s : %s" % - (arriving_traps_filename, str(e))) + msg = "Error opening arriving traps %s : %s" % (arriving_traps_filename, str(e)) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -111,8 +115,7 @@ def open_eelf_logs(): try: # open various ecomp logs - if any fails, exit - tds.eelf_error_file_name = ( - tds.c_config['files']['eelf_base_dir'] + "/" + tds.c_config['files']['eelf_error']) + tds.eelf_error_file_name = tds.c_config["files"]["eelf_base_dir"] + "/" + tds.c_config["files"]["eelf_error"] tds.eelf_error_fd = open_file(tds.eelf_error_file_name) except Exception as e: @@ -121,8 +124,7 @@ def open_eelf_logs(): cleanup_and_exit(1, tds.pid_file_name) try: - tds.eelf_debug_file_name = ( - tds.c_config['files']['eelf_base_dir'] + "/" + tds.c_config['files']['eelf_debug']) + tds.eelf_debug_file_name = tds.c_config["files"]["eelf_base_dir"] + "/" + tds.c_config["files"]["eelf_debug"] tds.eelf_debug_fd = open_file(tds.eelf_debug_file_name) except Exception as e: @@ -131,8 +133,7 @@ def open_eelf_logs(): cleanup_and_exit(1, tds.pid_file_name) try: - tds.eelf_audit_file_name = ( - tds.c_config['files']['eelf_base_dir'] + "/" + tds.c_config['files']['eelf_audit']) + tds.eelf_audit_file_name = tds.c_config["files"]["eelf_base_dir"] + "/" + tds.c_config["files"]["eelf_audit"] tds.eelf_audit_fd = open_file(tds.eelf_audit_file_name) except Exception as e: msg = "Error opening eelf audit log : " + str(e) @@ -141,7 +142,8 @@ def open_eelf_logs(): try: tds.eelf_metrics_file_name = ( - tds.c_config['files']['eelf_base_dir'] + "/" + tds.c_config['files']['eelf_metrics']) + tds.c_config["files"]["eelf_base_dir"] + "/" + tds.c_config["files"]["eelf_metrics"] + ) tds.eelf_metrics_fd = open_file(tds.eelf_metrics_file_name) except Exception as e: msg = "Error opening eelf metric log : " + str(e) @@ -150,6 +152,7 @@ def open_eelf_logs(): return True + # # # # # # # # # # # # # # # # # # # # fx: roll_log_file -> move provided filename to timestamped version # # # # # # # # # # ## # # # # # # # @@ -160,11 +163,11 @@ def roll_file(_loc_file_name): move active file to timestamped archive """ - _file_name_suffix = "%s" % (datetime.datetime.fromtimestamp(time.time()). - fromtimestamp(time.time()). - strftime('%Y-%m-%dT%H:%M:%S')) + _file_name_suffix = "%s" % ( + datetime.datetime.fromtimestamp(time.time()).fromtimestamp(time.time()).strftime("%Y-%m-%dT%H:%M:%S") + ) - _loc_file_name_bak = _loc_file_name + '.' + _file_name_suffix + _loc_file_name_bak = _loc_file_name + "." + _file_name_suffix # roll existing file if present if os.path.isfile(_loc_file_name): @@ -172,15 +175,13 @@ def roll_file(_loc_file_name): os.rename(_loc_file_name, _loc_file_name_bak) return True except Exception as e: - _msg = ("ERROR: Unable to rename %s to %s" - % (_loc_file_name, - _loc_file_name_bak)) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, - tds.CODE_GENERAL, _msg) + _msg = "ERROR: Unable to rename %s to %s" % (_loc_file_name, _loc_file_name_bak) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, _msg) return False return False + # # # # # # # # # # # # # # fx: open_log_file # # # # # # # # # # # # # @@ -194,17 +195,16 @@ def open_file(_loc_file_name): try: # open append mode just in case so nothing is lost, but should be # non-existent file - _loc_fd = open(_loc_file_name, 'a', 1) + _loc_fd = open(_loc_file_name, "a", 1) return _loc_fd except Exception as e: msg = "Error opening " + _loc_file_name + " append mode - " + str(e) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) - -# # # # # # # # # # # # # -# fx: close_file -# # # # # # # # # # # # # + # # # # # # # # # # # # # + # fx: close_file + # # # # # # # # # # # # # """ close _loc_file_name, return True with success, False otherwise """ @@ -216,11 +216,11 @@ def close_file(_loc_fd, _loc_filename): _loc_fd.close() return True except Exception as e: - msg = "Error closing %s : %s - results indeterminate" % ( - _loc_filename, str(e)) + msg = "Error closing %s : %s - results indeterminate" % (_loc_filename, str(e)) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg) return False + # # # # # # # # # # # # # # # # # # # # fx: ecomp_logger -> log in eelf format until standard # is released for python via LOG-161 @@ -317,13 +317,22 @@ def ecomp_logger(_log_type, _sev, _error_code, _msg): # catch invalid log type if _log_type < 1 or _log_type > 5: - msg = ("INVALID log type: %s " % _log_type) - _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s" - % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, (msg + _msg))) + msg = "INVALID log type: %s " % _log_type + _out_rec = "%s|%s|%s|%s|%s|%s|%s|%s|%s" % ( + calling_fx, + "snmptrapd", + unused, + unused, + unused, + tds.SEV_TYPES[_sev], + _error_code, + unused, + (msg + _msg), + ) try: - tds.eelf_error_fd.write('%s|%s\n' % (t_out, str(_out_rec))) + tds.eelf_error_fd.write("%s|%s\n" % (t_out, str(_out_rec))) if log_to_stdout: - print('%s|%s' % (t_out, str(_out_rec))) + print("%s|%s" % (t_out, str(_out_rec))) except Exception as e: stdout_logger(str(_out_rec)) @@ -335,34 +344,79 @@ def ecomp_logger(_log_type, _sev, _error_code, _msg): # _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' # _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' # % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) - _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s' - % (unused, unused, calling_fx, unused, "snmptrapd", unused, unused, unused, unused, unused, unused, unused, tds.SEV_TYPES[_sev], unused, unused, unused, unused, unused, unused, unused, unused, unused, unused, unused, unused, unused, _msg)) + _out_rec = "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s" % ( + unused, + unused, + calling_fx, + unused, + "snmptrapd", + unused, + unused, + unused, + unused, + unused, + unused, + unused, + tds.SEV_TYPES[_sev], + unused, + unused, + unused, + unused, + unused, + unused, + unused, + unused, + unused, + unused, + unused, + unused, + unused, + _msg, + ) try: - tds.eelf_error_fd.write('%s|%s|%s\n' % (t_out, t_out, str(_out_rec))) + tds.eelf_error_fd.write("%s|%s|%s\n" % (t_out, t_out, str(_out_rec))) if log_to_stdout: - print('%s|%s|%s' % (t_out, t_out, str(_out_rec))) + print("%s|%s|%s" % (t_out, t_out, str(_out_rec))) except Exception as e: stdout_logger(str(_out_rec)) elif _log_type == tds.LOG_TYPE_AUDIT: # log message in AUDIT format # _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' # % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) - _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' - % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) + _out_rec = "%s|%s|%s|%s|%s|%s|%s|%s|%s" % ( + calling_fx, + "snmptrapd", + unused, + unused, + unused, + tds.SEV_TYPES[_sev], + _error_code, + unused, + _msg, + ) try: - tds.eelf_audit_fd.write('%s|%s\n' % (t_out, str(_out_rec))) + tds.eelf_audit_fd.write("%s|%s\n" % (t_out, str(_out_rec))) if log_to_stdout: - print('%s|%s' % (t_out, str(_out_rec))) + print("%s|%s" % (t_out, str(_out_rec))) except Exception as e: stdout_logger(str(_out_rec)) elif _log_type == tds.LOG_TYPE_METRICS: # log message in METRICS format - _out_rec = ('%s|%s|%s|%s|%s|%s|%s|%s|%s' - % (calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) + _out_rec = "%s|%s|%s|%s|%s|%s|%s|%s|%s" % ( + calling_fx, + "snmptrapd", + unused, + unused, + unused, + tds.SEV_TYPES[_sev], + _error_code, + unused, + _msg, + ) try: - tds.eelf_metrics_fd.write('%s|%s\n' % (t_out, str(_out_rec))) + tds.eelf_metrics_fd.write("%s|%s\n" % (t_out, str(_out_rec))) if log_to_stdout: - print('%s|%s' % (t_out, str(_out_rec))) + print("%s|%s" % (t_out, str(_out_rec))) except Exception as e: stdout_logger(str(_out_rec)) @@ -370,17 +424,28 @@ def ecomp_logger(_log_type, _sev, _error_code, _msg): # DLFM: too much I/O !!! # always write to debug; we need ONE logfile that has time-sequence full view !!! # log message in DEBUG format - _out_rec = ("%s|%s|%s|%s|%s|%s|%s|%s|%s|%s" - % (unused, calling_fx, "snmptrapd", unused, unused, unused, tds.SEV_TYPES[_sev], _error_code, unused, _msg)) + _out_rec = "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s" % ( + unused, + calling_fx, + "snmptrapd", + unused, + unused, + unused, + tds.SEV_TYPES[_sev], + _error_code, + unused, + _msg, + ) try: - tds.eelf_debug_fd.write('%s|%s\n' % (t_out, str(_out_rec))) + tds.eelf_debug_fd.write("%s|%s\n" % (t_out, str(_out_rec))) if log_to_stdout: - print('%s|%s' % (t_out, str(_out_rec))) + print("%s|%s" % (t_out, str(_out_rec))) except Exception as e: stdout_logger(str(_out_rec)) return True + # # # # # # # # # # # # # # fx: stdout_logger # # # # # # # # # # # # # @@ -403,4 +468,4 @@ def stdout_logger(_msg): t_out = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S,%f")[:-3] - print('%s %s' % (t_out, _msg)) + print("%s %s" % (t_out, _msg)) diff --git a/snmptrap/mod/trapd_runtime_pid.py b/snmptrap/mod/trapd_runtime_pid.py index 74668f5..c647885 100644 --- a/snmptrap/mod/trapd_runtime_pid.py +++ b/snmptrap/mod/trapd_runtime_pid.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ trapd_runtime_pid maintains a 'PID file' (file that contains the PID of currently running trap receiver) """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import logging import os @@ -47,8 +47,8 @@ def save_pid(_pid_file_name): """ try: - pid_fd = open(_pid_file_name, 'w') - pid_fd.write('%d' % os.getpid()) + pid_fd = open(_pid_file_name, "w") + pid_fd.write("%d" % os.getpid()) pid_fd.close() except IOError: print("IOError saving PID file %s :" % _pid_file_name) diff --git a/snmptrap/mod/trapd_settings.py b/snmptrap/mod/trapd_settings.py index 41dc18f..26e7c9b 100644 --- a/snmptrap/mod/trapd_settings.py +++ b/snmptrap/mod/trapd_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ """ """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" def init(): diff --git a/snmptrap/mod/trapd_snmpv3.py b/snmptrap/mod/trapd_snmpv3.py index b421ae1..56ecb5c 100644 --- a/snmptrap/mod/trapd_snmpv3.py +++ b/snmptrap/mod/trapd_snmpv3.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ module for snmpv3 support """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import json import os @@ -60,7 +60,7 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config): try: v3_users = _cbs_config["snmpv3_config"]["usm_users"] except Exception as e: - msg = ("No V3 users defined") + msg = "No V3 users defined" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) return _py_config, _snmp_engine @@ -68,13 +68,13 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config): # engineId try: - ctx_engine_id = v3_user['engineId'] + ctx_engine_id = v3_user["engineId"] except Exception as e: ctx_engine_id = None # user try: - userName = v3_user['user'] + userName = v3_user["user"] except Exception as e: userName = None @@ -86,40 +86,38 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config): # usmHMACMD5AuthProtocol try: - authKey = v3_user['usmHMACMD5AuthProtocol'] + authKey = v3_user["usmHMACMD5AuthProtocol"] authProtocol = config.usmHMACMD5AuthProtocol except Exception as e: try: - authKey = v3_user['usmHMACSHAAuthProtocol'] + authKey = v3_user["usmHMACSHAAuthProtocol"] authProtocol = config.usmHMACSHAAuthProtocol except Exception as e: try: - authKey = v3_user['usmHMAC128SHA224AuthProtocol'] + authKey = v3_user["usmHMAC128SHA224AuthProtocol"] authProtocol = config.usmHMAC128SHA224AuthProtocol except Exception as e: try: - authKey = v3_user['usmHMAC192SHA256AuthProtocol'] + authKey = v3_user["usmHMAC192SHA256AuthProtocol"] authProtocol = config.usmHMAC192SHA256AuthProtocol except Exception as e: try: - authKey = v3_user['usmHMAC256SHA384AuthProtocol'] + authKey = v3_user["usmHMAC256SHA384AuthProtocol"] authProtocol = config.usmHMAC256SHA384AuthProtocol except Exception as e: try: - authKey = v3_user['usmHMAC384SHA512AuthProtocol'] + authKey = v3_user["usmHMAC384SHA512AuthProtocol"] authProtocol = config.usmHMAC384SHA512AuthProtocol except Exception as e: try: - authKey = v3_user['usmNoAuthProtocol'] + authKey = v3_user["usmNoAuthProtocol"] authProtocol = config.usmNoAuthProtocol except Exception as e: # FMDL: default to NoAuth, or error/skip entry? - msg = ( - "No auth specified for user %s ?" % (userName)) + msg = "No auth specified for user %s ?" % (userName) authKey = None authProtocol = config.usmNoAuthProtocol - ecomp_logger( - tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) # privacy # find options at -> site-packages/pysnmp/entity/config.py @@ -129,56 +127,57 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config): # usm3DESEDEPriv try: - privKey = v3_user['usm3DESEDEPrivProtocol'] + privKey = v3_user["usm3DESEDEPrivProtocol"] privProtocol = config.usm3DESEDEPrivProtocol except Exception as e: # usmAesCfb128Protocol try: - privKey = v3_user['usmAesCfb128Protocol'] + privKey = v3_user["usmAesCfb128Protocol"] privProtocol = config.usmAesCfb128Protocol except Exception as e: # usmAesCfb192Protocol try: - privKey = v3_user['usmAesCfb192Protocol'] + privKey = v3_user["usmAesCfb192Protocol"] privProtocol = config.usmAesCfb192Protocol except Exception as e: # usmAesBlumenthalCfb192Protocol try: - privKey = v3_user['usmAesBlumenthalCfb192Protocol'] + privKey = v3_user["usmAesBlumenthalCfb192Protocol"] privProtocol = config.usmAesBlumenthalCfb192Protocol except Exception as e: # usmAesCfb256Protocol try: - privKey = v3_user['usmAesCfb256Protocol'] + privKey = v3_user["usmAesCfb256Protocol"] privProtocol = config.usmAesCfb256Protocol except Exception as e: # usmAesBlumenthalCfb256Protocol try: - privKey = v3_user['usmAesBlumenthalCfb256Protocol'] + privKey = v3_user["usmAesBlumenthalCfb256Protocol"] privProtocol = config.usmAesBlumenthalCfb256Protocol except Exception as e: # usmDESPrivProtocol try: - privKey = v3_user['usmDESPrivProtocol'] + privKey = v3_user["usmDESPrivProtocol"] privProtocol = config.usmDESPrivProtocol except Exception as e: # usmNoPrivProtocol try: - privKey = v3_user['usmNoPrivProtocol'] + privKey = v3_user["usmNoPrivProtocol"] privProtocol = config.usmNoPrivProtocol except Exception as e: # FMDL: default to NoPriv, or error/skip entry? - msg = ( - "No priv specified for user %s" % (userName)) - ecomp_logger( - tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + msg = "No priv specified for user %s" % (userName) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) privKey = None privProtocol = config.usmNoPrivProtocol # break # msg = ("userName: %s authKey: %s authProtocol: %s privKey: %s privProtocol: %s engineId: %s % (userName, authKey, authProtocol, privKey, privProtocol, ctx_engine_id)) - msg = ("userName: %s authKey: **** authProtocol: %s privKey: **** privProtocol: %s engineId: ****" % - (userName, authProtocol, privProtocol)) + msg = "userName: %s authKey: **** authProtocol: %s privKey: **** privProtocol: %s engineId: ****" % ( + userName, + authProtocol, + privProtocol, + ) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) # user: usr-md5-des, auth: MD5, priv DES, contextEngineId: 8000000001020304 @@ -189,16 +188,15 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config): if ctx_engine_id is not None: config.addV3User( - _snmp_engine, userName, - authProtocol, authKey, - privProtocol, privKey, - contextEngineId=v2c.OctetString(hexValue=ctx_engine_id) + _snmp_engine, + userName, + authProtocol, + authKey, + privProtocol, + privKey, + contextEngineId=v2c.OctetString(hexValue=ctx_engine_id), ) else: - config.addV3User( - _snmp_engine, userName, - authProtocol, authKey, - privProtocol, privKey - ) + config.addV3User(_snmp_engine, userName, authProtocol, authKey, privProtocol, privKey) return _py_config, _snmp_engine diff --git a/snmptrap/mod/trapd_stats_settings.py b/snmptrap/mod/trapd_stats_settings.py index df4da0e..2aacb2e 100644 --- a/snmptrap/mod/trapd_stats_settings.py +++ b/snmptrap/mod/trapd_stats_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ """ """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" def init(): diff --git a/snmptrap/mod/trapd_stormwatch.py b/snmptrap/mod/trapd_stormwatch.py index 9e41bd0..4c374bb 100644 --- a/snmptrap/mod/trapd_stormwatch.py +++ b/snmptrap/mod/trapd_stormwatch.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ threshold (storm), and if so it will return "False" so the trap can be logged and immediately discarded """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import sys import os @@ -98,11 +98,11 @@ def sw_clear_dicts(): sws.sw_config_category.clear() return True except Exception as e: - msg = "unable to reset stormwatch dictionaries - results will be indeterminate: %s" % ( - e) + msg = "unable to reset stormwatch dictionaries - results will be indeterminate: %s" % (e) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) return False + # # # # # # # # # # # # # # fx: sw_load_trap_config # - load trap configurations from CBS response @@ -121,10 +121,10 @@ def sw_load_trap_config(_config): try: sws.sw_storm_active_dict ret = sw_clear_dicts() - msg = ("reset existing sws dictionaries to empty") + msg = "reset existing sws dictionaries to empty" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) except NameError: - msg = ("sws dictionaries not present - initializing") + msg = "sws dictionaries not present - initializing" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) ret = sw_init() @@ -134,24 +134,22 @@ def sw_load_trap_config(_config): # get metric % threshold for logging trap count by-agent to metric log try: stats.metric_log_notification_threshold_pct = int( - _config["trap_config"]["metric_log_notification_threshold_pct"]) - msg = ("metric_log_notification_threshold_pct value: %d" % - stats.metric_log_notification_threshold_pct) + _config["trap_config"]["metric_log_notification_threshold_pct"] + ) + msg = "metric_log_notification_threshold_pct value: %d" % stats.metric_log_notification_threshold_pct ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) except Exception as e: - msg = ( - "metric_log_notification_threshold_pct not present in config - default to 25") + msg = "metric_log_notification_threshold_pct not present in config - default to 25" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) stats.metric_log_notification_threshold_pct = 25 # get stormwatch interval; default to 60 seconds try: - sws.sw_interval_in_seconds = int( - _config["trap_config"]["sw_interval_in_seconds"]) - msg = ("sw_interval_in_seconds value: %d" % sws.sw_interval_in_seconds) + sws.sw_interval_in_seconds = int(_config["trap_config"]["sw_interval_in_seconds"]) + msg = "sw_interval_in_seconds value: %d" % sws.sw_interval_in_seconds ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) except Exception as e: - msg = ("sw_interval_in_seconds not present in config - default to 60 seconds") + msg = "sw_interval_in_seconds not present in config - default to 60 seconds" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) sws.sw_interval_in_seconds = 60 @@ -159,7 +157,7 @@ def sw_load_trap_config(_config): try: notify_oids = _config["trap_config"]["notify_oids"] except Exception as e: - msg = ("no trap_config or notify_oids defined") + msg = "no trap_config or notify_oids defined" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) return False @@ -167,65 +165,66 @@ def sw_load_trap_config(_config): for trap_block in notify_oids: # oid try: - _oid = trap_block['oid'] + _oid = trap_block["oid"] except Exception as e: - msg = ( - "missing oid value in notify_oids - oid section of CBS config - using empty value, disregard entry") - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + msg = "missing oid value in notify_oids - oid section of CBS config - using empty value, disregard entry" + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) _oid = None # sw_high_water_in_interval try: - _sw_high_water_in_interval = int( - trap_block['sw_high_water_in_interval']) + _sw_high_water_in_interval = int(trap_block["sw_high_water_in_interval"]) except Exception as e: msg = ( - "missing sw_high_water_in_interval value in notify_oids - oid section of CBS config - using empty value") - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + "missing sw_high_water_in_interval value in notify_oids - oid section of CBS config - using empty value" + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) _sw_high_water_in_interval = None # sw_low_water_in_interval try: - _sw_low_water_in_interval = int( - trap_block['sw_low_water_in_interval']) + _sw_low_water_in_interval = int(trap_block["sw_low_water_in_interval"]) except Exception as e: msg = ( - "missing sw_low_water_in_interval value in notify_oids - oid section of CBS config - using empty value") - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + "missing sw_low_water_in_interval value in notify_oids - oid section of CBS config - using empty value" + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) _sw_low_water_in_interval = None # category try: - _category = trap_block['category'] + _category = trap_block["category"] except Exception as e: - msg = ( - "missing category value in notify_oids - oid section of CBS config - using empty value") - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + msg = "missing category value in notify_oids - oid section of CBS config - using empty value" + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) _category = None - if (_oid is not None and _category is not None and - _sw_low_water_in_interval is not None and _sw_high_water_in_interval is not None and - _sw_low_water_in_interval < _sw_high_water_in_interval): + if ( + _oid is not None + and _category is not None + and _sw_low_water_in_interval is not None + and _sw_high_water_in_interval is not None + and _sw_low_water_in_interval < _sw_high_water_in_interval + ): # FMDL: Do we actually need sw_config_oid_dict? - msg = ("oid: %s sw_high_water_in_interval: %d sw_low_water_in_interval: %d category: %s" % ( - _oid, _sw_high_water_in_interval, _sw_low_water_in_interval, _category)) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + msg = "oid: %s sw_high_water_in_interval: %d sw_low_water_in_interval: %d category: %s" % ( + _oid, + _sw_high_water_in_interval, + _sw_low_water_in_interval, + _category, + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) sws.sw_config_oid_dict[_oid] = True sws.sw_config_low_water_in_interval_dict[_oid] = _sw_low_water_in_interval sws.sw_config_high_water_in_interval_dict[_oid] = _sw_high_water_in_interval sws.sw_config_category[_oid] = _category trap_block_counter += 1 else: - msg = ("Missing or incorrect value for stormwatch config entry %d: skipping: %s" % ( - trap_block_counter, trap_block)) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) - + msg = "Missing or incorrect value for stormwatch config entry %d: skipping: %s" % ( + trap_block_counter, + trap_block, + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) return trap_block_counter @@ -246,17 +245,18 @@ def sw_log_metrics(): :Variables: """ - msg = "total notifications: %d, interval in seconds: %d" % ( - stats.total_notifications, sws.sw_interval_in_seconds) + msg = "total notifications: %d, interval in seconds: %d" % (stats.total_notifications, sws.sw_interval_in_seconds) ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_INFO, tds.CODE_GENERAL, msg) # print metrics for total traps and traps-per-second avg # during sample interval avg_traps_per_second = stats.total_notifications / 60 msg = "total traps: %d, interval in seconds: %d, average traps-per-second: %d" % ( - stats.total_notifications, sws.sw_interval_in_seconds, avg_traps_per_second) - ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + stats.total_notifications, + sws.sw_interval_in_seconds, + avg_traps_per_second, + ) + ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_WARN, tds.CODE_GENERAL, msg) # print metrics for any agent that represents more than stats.metric_log_notification_threshold_pct # during sample interval @@ -265,9 +265,13 @@ def sw_log_metrics(): p = c / stats.total_notifications * 100 if p > stats.metric_log_notification_threshold_pct: msg = "agent: %s, notifications: %d, interval in seconds: %d, percent of total traps: %d" % ( - k, c, sws.sw_interval_in_seconds, p) - ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + k, + c, + sws.sw_interval_in_seconds, + p, + ) + ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_WARN, tds.CODE_GENERAL, msg) + # # # # # # # # # # # # # # fx: stats_increment_counters @@ -339,8 +343,10 @@ def sw_storm_active(_loc_agent, _loc_oid): # if we are at or above stormwatch interval, re-eval and re-set elapsed_time = int(time.time()) - sws.sw_last_stormwatch_dict_analysis if elapsed_time >= sws.sw_interval_in_seconds: - msg = "%d seconds has elapsed since stormwatch dictionary eval (%d second threshold) - check and reset counters " % ( - elapsed_time, sws.sw_interval_in_seconds) + msg = ( + "%d seconds has elapsed since stormwatch dictionary eval (%d second threshold) - check and reset counters " + % (elapsed_time, sws.sw_interval_in_seconds) + ) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) sw_log_metrics() sw_reset_counter_dict() @@ -349,8 +355,7 @@ def sw_storm_active(_loc_agent, _loc_oid): # that means it's participating in stormwatch, otherwise bail out try: _high_water_val = sws.sw_config_high_water_in_interval_dict[_loc_oid] - msg = "%s present in stormwatch config - high water value: %d" % ( - _loc_oid, _high_water_val) + msg = "%s present in stormwatch config - high water value: %d" % (_loc_oid, _high_water_val) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) except Exception as e: return False @@ -375,25 +380,32 @@ def sw_storm_active(_loc_agent, _loc_oid): # if we got this far, trap is in stormwatch configs, we've incremented # counter in sw_storm_counter_dict - figure out if we are over limit if sws.sw_storm_counter_dict[_dict_key] > _high_water_val: - print(f"sws.sw_storm_counter_dict[{_dict_key}]({sws.sw_storm_counter_dict[_dict_key]}) > _high_water_val ({_high_water_val})") + print( + f"sws.sw_storm_counter_dict[{_dict_key}]({sws.sw_storm_counter_dict[_dict_key]}) > _high_water_val ({_high_water_val})" + ) _loc_agent = _dict_key.split()[0] _loc_oid = _dict_key.split()[1] msg = "STORM ACTIVE: received %d events (%s) from %s (greater than high water threshold: %d)" % ( - sws.sw_storm_counter_dict[_dict_key], _loc_oid, _loc_agent, _high_water_val) + sws.sw_storm_counter_dict[_dict_key], + _loc_oid, + _loc_agent, + _high_water_val, + ) ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_WARN, tds.CODE_GENERAL, msg) try: sws.sw_storm_active_dict[_dict_key] = True except Exception as e: - msg = "ERROR setting %s in storm active state: %s " % ( - _dict_key, e) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, - tds.CODE_GENERAL, msg) + msg = "ERROR setting %s in storm active state: %s " % (_dict_key, e) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg) return True else: - print(f"NOT sws.sw_storm_counter_dict[{_dict_key}]({sws.sw_storm_counter_dict[_dict_key]}) > _high_water_val ({_high_water_val})") + print( + f"NOT sws.sw_storm_counter_dict[{_dict_key}]({sws.sw_storm_counter_dict[_dict_key]}) > _high_water_val ({_high_water_val})" + ) return False + # # # # # # # # # # # # # # fx: sw_reset_counter_dict # - reset counter dictionary on boundaries @@ -416,11 +428,9 @@ def sw_reset_counter_dict(): # publish stats to MR... try: msg = "publish counts-by-oid from stats.oid_counter_dict" - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) msg = "publish count-by-agent from stats.agent_counter_dict" - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) except Exception as e: msg = "unable to publish counts by oid and agent to MR: " % (e) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) @@ -431,8 +441,7 @@ def sw_reset_counter_dict(): stats.agent_counter_dict.clear() stats.total_notifications = 0 except Exception as e: - msg = "unable to reset counts by oid and agent dictionaries - stats will be INNACURATE: " % ( - e) + msg = "unable to reset counts by oid and agent dictionaries - stats will be INNACURATE: " % (e) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) # @@ -457,36 +466,45 @@ def sw_reset_counter_dict(): if sws.sw_storm_counter_dict[k] >= _high_water_val: msg = "%s remaining in storm state, received %d events (GE to upper threshold: %d)" % ( - k, sws.sw_storm_counter_dict[k], _high_water_val) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + k, + sws.sw_storm_counter_dict[k], + _high_water_val, + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) sws.sw_storm_counter_dict[k] = 0 else: _low_water_val = sws.sw_config_low_water_in_interval_dict[_loc_oid] if sws.sw_storm_counter_dict[k] < _low_water_val: try: msg = "STORM OVER: received %d events (%s) from %s (less than low water threshold: %d)" % ( - sws.sw_storm_counter_dict[k], _loc_oid, _loc_agent, _low_water_val) - ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + sws.sw_storm_counter_dict[k], + _loc_oid, + _loc_agent, + _low_water_val, + ) + ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_WARN, tds.CODE_GENERAL, msg) del sws.sw_storm_active_dict[k] sws.sw_storm_counter_dict[k] = 0 except Exception as e: - msg = "unable to remove %s from storm active dictionary - TRAPS MAY BE DISCARDED UNINTENTIONALLY! Reason: %s " % ( - k, e) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, - tds.CODE_GENERAL, msg) + msg = ( + "unable to remove %s from storm active dictionary - TRAPS MAY BE DISCARDED UNINTENTIONALLY! Reason: %s " + % (k, e) + ) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg) else: msg = "%s remaining in storm state, received %d events (GE to lower threshold: %d)" % ( - k, sws.sw_storm_counter_dict[k], _low_water_val) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + k, + sws.sw_storm_counter_dict[k], + _low_water_val, + ) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_INFO, tds.CODE_GENERAL, msg) sws.sw_storm_counter_dict[k] = 0 sws.sw_last_stormwatch_dict_analysis = int(time.time()) return True + # # # # # # # # # # # # # # fx: sw_increment_counter # - increment OID and agent trap counters @@ -511,14 +529,11 @@ def sw_increment_counter(_dict_key): try: sws.sw_storm_counter_dict[_dict_key] += 1 - msg = "stormwatch counter for %s now: %d" % ( - _dict_key, sws.sw_storm_counter_dict[_dict_key]) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + msg = "stormwatch counter for %s now: %d" % (_dict_key, sws.sw_storm_counter_dict[_dict_key]) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) return True except Exception as E: msg = "first trap for %s - init stormwatch counter to 1" % (_dict_key) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) sws.sw_storm_counter_dict[_dict_key] = 1 return True diff --git a/snmptrap/mod/trapd_stormwatch_settings.py b/snmptrap/mod/trapd_stormwatch_settings.py index f2586f9..a4bc410 100644 --- a/snmptrap/mod/trapd_stormwatch_settings.py +++ b/snmptrap/mod/trapd_stormwatch_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ """ """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" def init(): diff --git a/snmptrap/mod/trapd_vb_types.py b/snmptrap/mod/trapd_vb_types.py index 98d5d2c..0b73e68 100644 --- a/snmptrap/mod/trapd_vb_types.py +++ b/snmptrap/mod/trapd_vb_types.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ module for converting varbind types from Net-SNMP to PYSNMP """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import json import os @@ -42,18 +42,18 @@ prog_name = os.path.basename(__file__) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # _pysnmp_to_netsnmp_vb_type = { - 'Integer32': 'integer', - 'Integer': 'integer', - 'Gauge32': 'unsigned', - 'Counter32': 'counter32', - 'OctetString': 'octet', - 'py_type_5': 'hex', - 'py_type_6': 'decimal', - 'Null': 'null', - 'ObjectIdentifier': 'oid', - 'TimeTicks': 'timeticks', - 'IpAddress': 'ipaddress', - 'Bits': 'bits' + "Integer32": "integer", + "Integer": "integer", + "Gauge32": "unsigned", + "Counter32": "counter32", + "OctetString": "octet", + "py_type_5": "hex", + "py_type_6": "decimal", + "Null": "null", + "ObjectIdentifier": "oid", + "TimeTicks": "timeticks", + "IpAddress": "ipaddress", + "Bits": "bits", } default_vb_type = "octet" @@ -77,7 +77,6 @@ def pysnmp_to_netsnmp_varbind_convert(_pysnmp_vb_type): return _netsnmp_vb_type except Exception as e: # if not found, return original pysnmp type - msg = ("%s not configured as pysnmp varbind type - returning %s" - % (_pysnmp_vb_type,default_vb_type)) + msg = "%s not configured as pysnmp varbind type - returning %s" % (_pysnmp_vb_type, default_vb_type) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) return default_vb_type diff --git a/snmptrap/snmptrapd.py b/snmptrap/snmptrapd.py index 5f46f34..ddb6b32 100644 --- a/snmptrap/snmptrapd.py +++ b/snmptrap/snmptrapd.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ is published to a dmaap instance that has been defined by controller. onap dcae snmp trap publish dmaap """ -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" # basics import argparse @@ -57,6 +57,7 @@ import uuid as uuid_mod # pysnmp from pysnmp.entity import engine, config from pysnmp.carrier.asyncio.dgram import udp, udp6 + # from pysnmp.carrier.asyncore.dgram import udp from pysnmp.entity.rfc3413 import ntfrcv from pysnmp.proto.api import v2c @@ -68,12 +69,10 @@ import trapd_settings as tds from trapd_runtime_pid import save_pid, rm_pid from trapd_get_cbs_config import get_cbs_config from trapd_exit import cleanup_and_exit -from trapd_http_session import init_session_obj, close_session_obj,\ - reset_session_obj +from trapd_http_session import init_session_obj, close_session_obj, reset_session_obj from trapd_snmpv3 import load_snmpv3_credentials from trapd_vb_types import pysnmp_to_netsnmp_varbind_convert -from trapd_io import roll_all_logs, open_eelf_logs, roll_file, open_file,\ - close_file, ecomp_logger, stdout_logger +from trapd_io import roll_all_logs, open_eelf_logs, roll_file, open_file, close_file, ecomp_logger, stdout_logger import trapd_stormwatch_settings as sws import trapd_stormwatch as stormwatch @@ -99,8 +98,8 @@ def usage_err(): usage args """ - print('Incorrect usage invoked. Correct usage:') - print(' %s [-v]' % prog_name) + print("Incorrect usage invoked. Correct usage:") + print(" %s [-v]" % prog_name) cleanup_and_exit(1, "undefined") @@ -127,10 +126,8 @@ def load_all_configs(_signum, _frame): """ if int(_signum) != 0: - msg = ("received signal %s at frame %s; re-reading configs" - % (_signum, _frame)) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + msg = "received signal %s at frame %s; re-reading configs" % (_signum, _frame) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) # re-request config from broker: if not get_cbs_config(): @@ -138,19 +135,14 @@ def load_all_configs(_signum, _frame): stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) else: - current_runtime_config_file_name = ( - tds.c_config['files']['runtime_base_dir'] + - "/tmp/current_config.json") + current_runtime_config_file_name = tds.c_config["files"]["runtime_base_dir"] + "/tmp/current_config.json" if int(_signum) != 0: - msg = "updated config logged to : %s" % \ - current_runtime_config_file_name + msg = "updated config logged to : %s" % current_runtime_config_file_name else: - msg = "current config logged to : %s" % \ - current_runtime_config_file_name - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, - msg) + msg = "current config logged to : %s" % current_runtime_config_file_name + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - with open(current_runtime_config_file_name, 'w') as outfile: + with open(current_runtime_config_file_name, "w") as outfile: json.dump(tds.c_config, outfile) # reset http session based on latest config @@ -158,10 +150,8 @@ def load_all_configs(_signum, _frame): # reload sw participating entries, reset counter dictionary traps_configured = stormwatch.sw_load_trap_config(tds.c_config) - msg = "encountered %d trap configurations in CBS/json config" % \ - traps_configured - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, - msg) + msg = "encountered %d trap configurations in CBS/json config" % traps_configured + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) tds.last_minute = datetime.datetime.now().minute tds.last_hour = datetime.datetime.now().hour @@ -178,34 +168,33 @@ def resolve_ip(_loc_ip_addr_str): try: if int(tds.dns_cache_ip_expires[_loc_ip_addr_str] < int(time.time())): - raise Exception('cache expired for %s at %d - updating value' % - (_loc_ip_addr_str, - (tds.dns_cache_ip_expires[_loc_ip_addr_str]))) + raise Exception( + "cache expired for %s at %d - updating value" + % (_loc_ip_addr_str, (tds.dns_cache_ip_expires[_loc_ip_addr_str])) + ) else: agent_fqdn = tds.dns_cache_ip_to_name[_loc_ip_addr_str] except Exception as e: - msg = "dns cache expired or missing for %s - refreshing" % \ - _loc_ip_addr_str - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + msg = "dns cache expired or missing for %s - refreshing" % _loc_ip_addr_str + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) try: - agent_fqdn, alias, addresslist = socket.gethostbyaddr( - _loc_ip_addr_str) + agent_fqdn, alias, addresslist = socket.gethostbyaddr(_loc_ip_addr_str) except Exception as e: agent_fqdn = _loc_ip_addr_str tds.dns_cache_ip_to_name[_loc_ip_addr_str] = agent_fqdn - tds.dns_cache_ip_expires[_loc_ip_addr_str] = ( - time.time() + int(tds.c_config['cache']['dns_cache_ttl_seconds'])) - msg = "cache for %s (%s) updated - set to expire at %d" % \ - (agent_fqdn, _loc_ip_addr_str, - tds.dns_cache_ip_expires[_loc_ip_addr_str]) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + tds.dns_cache_ip_expires[_loc_ip_addr_str] = time.time() + int(tds.c_config["cache"]["dns_cache_ttl_seconds"]) + msg = "cache for %s (%s) updated - set to expire at %d" % ( + agent_fqdn, + _loc_ip_addr_str, + tds.dns_cache_ip_expires[_loc_ip_addr_str], + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) return agent_fqdn + # # # # # # # # # # # # # # fx: log_all_arriving_traps # # # # # # # # # # # # # @@ -214,12 +203,12 @@ def resolve_ip(_loc_ip_addr_str): def log_all_arriving_traps(): # roll logs as needed/defined in files.roll_frequency - if tds.c_config['files']['roll_frequency'] == "minute": + if tds.c_config["files"]["roll_frequency"] == "minute": curr_minute = datetime.datetime.now().minute if curr_minute != tds.last_minute: roll_all_logs() tds.last_minute = curr_minute - elif tds.c_config['files']['roll_frequency'] == "hour": + elif tds.c_config["files"]["roll_frequency"] == "hour": curr_hour = datetime.datetime.now().hour if curr_hour != tds.last_hour: roll_all_logs() @@ -233,28 +222,35 @@ def log_all_arriving_traps(): # always log arriving trap try: - time_now=int(round(time.time(),0)) - arrived_epoch_time_int=int(tds.trap_dict["time received"]) - tds.arriving_traps_fd.write('%d %s; %s %s %s %s %s %s %s %s %s %s %s %s %s\n' % - (time_now, - datetime.datetime.fromtimestamp(time_now).strftime("%a %b %d %H:%M:%S %Y"), - tds.trap_dict["time received"], - datetime.datetime.fromtimestamp(arrived_epoch_time_int).strftime("%a %b %d %H:%M:%S %Y"), - tds.trap_dict["trap category"], - tds.trap_dict["epoch_serno"], - tds.trap_dict["notify OID"], - tds.trap_dict["agent name"], - tds.trap_dict["agent address"], - tds.trap_dict["pdu agent name"], - tds.trap_dict["pdu agent address"], - tds.trap_dict["cambria.partition"], - tds.trap_dict["protocol version"], - tds.trap_dict["uuid"], - tds.all_vb_str)) + time_now = int(round(time.time(), 0)) + arrived_epoch_time_int = int(tds.trap_dict["time received"]) + tds.arriving_traps_fd.write( + "%d %s; %s %s %s %s %s %s %s %s %s %s %s %s %s\n" + % ( + time_now, + datetime.datetime.fromtimestamp(time_now).strftime("%a %b %d %H:%M:%S %Y"), + tds.trap_dict["time received"], + datetime.datetime.fromtimestamp(arrived_epoch_time_int).strftime("%a %b %d %H:%M:%S %Y"), + tds.trap_dict["trap category"], + tds.trap_dict["epoch_serno"], + tds.trap_dict["notify OID"], + tds.trap_dict["agent name"], + tds.trap_dict["agent address"], + tds.trap_dict["pdu agent name"], + tds.trap_dict["pdu agent address"], + tds.trap_dict["cambria.partition"], + tds.trap_dict["protocol version"], + tds.trap_dict["uuid"], + tds.all_vb_str, + ) + ) except Exception as e: msg = "Error writing to %s : %s - arriving trap %s NOT LOGGED" % ( - tds.arriving_traps_filename, str(e), tds.trap_dict["uuid"]) + tds.arriving_traps_filename, + str(e), + tds.trap_dict["uuid"], + ) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg) @@ -272,14 +268,11 @@ def log_published_messages(_post_data_enclosed): ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) try: - tds.json_traps_fd.write('%s\n' % _post_data_enclosed) - msg = "successfully logged json for %s to %s" % ( - tds.trap_dict["uuid"], tds.json_traps_filename) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + tds.json_traps_fd.write("%s\n" % _post_data_enclosed) + msg = "successfully logged json for %s to %s" % (tds.trap_dict["uuid"], tds.json_traps_filename) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) except Exception as e: - msg = "Error writing to %s : %s - trap %s NOT LOGGED" % ( - tds.json_traps_filename, str(e), tds.trap_dict["uuid"]) + msg = "Error writing to %s : %s - trap %s NOT LOGGED" % (tds.json_traps_filename, str(e), tds.trap_dict["uuid"]) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg) @@ -303,8 +296,7 @@ def post_dmaap(): if tds.http_requ_session is None: msg = "tds.http_requ_session is None - getting new (%s)" % tds.http_requ_session - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) tds.http_requ_session = init_session_obj() # if only 1 trap, ship as-is @@ -312,7 +304,7 @@ def post_dmaap(): post_data_enclosed = tds.all_traps_json_str else: # otherwise, add brackets around package - post_data_enclosed = '[' + tds.all_traps_json_str + ']' + post_data_enclosed = "[" + tds.all_traps_json_str + "]" msg = "post_data_enclosed: %s" % (post_data_enclosed) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) @@ -320,68 +312,91 @@ def post_dmaap(): k = 0 dmaap_pub_success = False - while not dmaap_pub_success and k < (int(tds.c_config['publisher']['http_retries'])): + while not dmaap_pub_success and k < (int(tds.c_config["publisher"]["http_retries"])): try: - if tds.c_config['streams_publishes']['sec_fault_unsecure']['aaf_username'] == "" or tds.c_config['streams_publishes']['sec_fault_unsecure']['aaf_username'] is None: + if ( + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] == "" + or tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] is None + ): msg = "%d trap(s) : %s - attempt %d (unsecure)" % ( - tds.traps_since_last_publish, tds.trap_uuids_in_buffer, k) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) - http_resp = tds.http_requ_session.post(tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url'], post_data_enclosed, - headers=http_headers, - timeout=tds.timeout_seconds) + tds.traps_since_last_publish, + tds.trap_uuids_in_buffer, + k, + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + http_resp = tds.http_requ_session.post( + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"], + post_data_enclosed, + headers=http_headers, + timeout=tds.timeout_seconds, + ) else: msg = "%d trap(s) : %s - attempt %d (secure)" % ( - tds.traps_since_last_publish, tds.trap_uuids_in_buffer, k) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) - http_resp = tds.http_requ_session.post(tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url'], post_data_enclosed, - auth=(tds.c_config['streams_publishes']['sec_fault_unsecure']['aaf_username'], - tds.c_config['streams_publishes']['sec_fault_unsecure']['aaf_password']), - headers=http_headers, - timeout=tds.timeout_seconds) + tds.traps_since_last_publish, + tds.trap_uuids_in_buffer, + k, + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + http_resp = tds.http_requ_session.post( + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"], + post_data_enclosed, + auth=( + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"], + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_password"], + ), + headers=http_headers, + timeout=tds.timeout_seconds, + ) if http_resp.status_code == requests.codes.ok: - msg = "%d trap(s) successfully published: %s" % ( - tds.traps_since_last_publish, tds.trap_uuids_in_buffer) - ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + msg = "%d trap(s) successfully published: %s" % (tds.traps_since_last_publish, tds.trap_uuids_in_buffer) + ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_INFO, tds.CODE_GENERAL, msg) log_published_messages(post_data_enclosed) tds.last_pub_time = time.time() dmaap_pub_success = True break else: msg = "Trap(s) %s publish attempt %d returned non-normal: %d %s" % ( - tds.trap_uuids_in_buffer, k, http_resp.status_code, http_resp.text) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + tds.trap_uuids_in_buffer, + k, + http_resp.status_code, + http_resp.text, + ) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) except OSError as e: msg = "OS exception while attempting to post %s attempt %d: (%s) %s %s" % ( - tds.trap_uuids_in_buffer, k, e.errno, e.strerror, str(e)) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + tds.trap_uuids_in_buffer, + k, + e.errno, + e.strerror, + str(e), + ) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) except requests.exceptions.RequestException as e: msg = "Requests exception while attempting to post %s attempt %d: (%d) %s" % ( - tds.trap_uuids_in_buffer, int(k), int(e.errno), str(e.strerror)) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + tds.trap_uuids_in_buffer, + int(k), + int(e.errno), + str(e.strerror), + ) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) k += 1 - if k < int(tds.c_config['publisher']['http_retries']): - msg = "sleeping %.4f seconds and retrying" % ( - tds.seconds_between_retries) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, - tds.CODE_GENERAL, msg) + if k < int(tds.c_config["publisher"]["http_retries"]): + msg = "sleeping %.4f seconds and retrying" % (tds.seconds_between_retries) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) time.sleep(tds.seconds_between_retries) else: break if not dmaap_pub_success: - msg = "ALL publish attempts failed for traps %s to URL %s "\ - % (tds.trap_uuids_in_buffer, tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url']) + msg = "ALL publish attempts failed for traps %s to URL %s " % ( + tds.trap_uuids_in_buffer, + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"], + ) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg) # FMDL: This currently tries, then logs error and trashes buffer if all dmaap attempts fail. Better way? @@ -390,6 +405,7 @@ def post_dmaap(): tds.all_traps_json_str = "" tds.first_trap = True + # # # # # # # # # # # # # # # # # # # # fx: request_observer for community string rewrite # # # # # # # # # # # # # # # # # # # @@ -398,8 +414,9 @@ def post_dmaap(): def comm_string_rewrite_observer(snmpEngine, execpoint, variables, cbCtx): # match ALL community strings - if re.match('.*', str(variables['communityName'])): - variables['communityName'] = variables['communityName'].clone('public') + if re.match(".*", str(variables["communityName"])): + variables["communityName"] = variables["communityName"].clone("public") + # # # # # # # # # # # # # # # # # # # # fx: snmp_engine_observer_cb @@ -457,21 +474,20 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): else: tds.traps_in_epoch = 0 tds.last_epoch_second = epoch_second - traps_in_epoch_04d = format(tds.traps_in_epoch, '04d') - tds.trap_dict['epoch_arrived'] = epoch_second - tds.trap_dict['epoch_serno'] = int( - (str(epoch_second) + str(traps_in_epoch_04d))) + traps_in_epoch_04d = format(tds.traps_in_epoch, "04d") + tds.trap_dict["epoch_arrived"] = epoch_second + tds.trap_dict["epoch_serno"] = int((str(epoch_second) + str(traps_in_epoch_04d))) # assign uuid to trap tds.trap_dict["uuid"] = str(uuid_mod.uuid1()) # ip and hostname - ip_addr_str = str(variables['transportAddress'][0]) - # set agent address and name to source of packet, OVERWRITE if + ip_addr_str = str(variables["transportAddress"][0]) + # set agent address and name to source of packet, OVERWRITE if # .1.3.6.1.6.3.18.1.3.0 varbind encountered later in trap processing tds.trap_dict["agent address"] = ip_addr_str tds.trap_dict["agent name"] = resolve_ip(ip_addr_str) - # set overridden/logical address and name to source of packet so we know + # set overridden/logical address and name to source of packet so we know # original value if .1.3.6.1.6.3.18.1.3.0 shows up # NOTE: This does NOT change ever, label may change to # "overridden agent..." in the future for truth in nameing @@ -479,17 +495,15 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): tds.trap_dict["pdu agent name"] = tds.trap_dict["agent name"] # log arrival now that we have agent addr - msg = 'trap from %s %s, assigned uuid: %s' % \ - (ip_addr_str, tds.trap_dict["agent name"], tds.trap_dict["uuid"]) + msg = "trap from %s %s, assigned uuid: %s" % (ip_addr_str, tds.trap_dict["agent name"], tds.trap_dict["uuid"]) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - tds.trap_dict["cambria.partition"] = str(tds.trap_dict["agent name"]) # do not include cleartext community in pub tds.trap_dict["community"] = "" tds.trap_dict["community len"] = 0 - snmp_version = variables['securityModel'] + snmp_version = variables["securityModel"] if snmp_version == 1: tds.trap_dict["protocol version"] = "v1" # enterprise = variables['pdu']['enterprise'].prettyPrint() @@ -517,16 +531,19 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): tds.trap_dict["protocol version"] = "unknown" # tds.trap_dict['time received'] = epoch_msecond - tds.trap_dict['time received'] = epoch_second - tds.trap_dict['trap category'] = ( - tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url']).split('/')[-1] + tds.trap_dict["time received"] = epoch_second + tds.trap_dict["trap category"] = ( + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"] + ).split("/")[-1] return True + # # # # # # # # # # # # # # # # # # # # fx: request_observer for community string rewrite # # # # # # # # # # # # # # # # # # # + def add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val): """ Called for each varbind, adds individual attributes of varbind instance to @@ -548,10 +565,11 @@ def add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val): """ if vb_idx == 0: - tds.all_vb_str = 'varbinds:' + tds.all_vb_str = "varbinds:" - tds.all_vb_str = tds.all_vb_str + " [" + str(vb_idx) + "] " \ - + str(vb_oid) + " {" + vb_type + "} " + str(vb_val.prettyPrint()) + tds.all_vb_str = ( + tds.all_vb_str + " [" + str(vb_idx) + "] " + str(vb_oid) + " {" + vb_type + "} " + str(vb_val.prettyPrint()) + ) # try: # tds.all_vb_str = tds.all_vb_str + " [" + str(vb_idx) + "] " + vb_oid + " {" + vb_type + "} " + vb_val @@ -562,6 +580,7 @@ def add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val): # ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) # return 1 + def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): """ Called for each varbind, adds individual attributes of varbind instance to @@ -595,8 +614,7 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): # if second varbind, use as notifyOID for all snmp versions if vb_idx == 1: tds.trap_dict["notify OID"] = "." + _vb_value - tds.trap_dict["notify OID len"] = tds.trap_dict["notify OID"].count( - '.') + tds.trap_dict["notify OID len"] = tds.trap_dict["notify OID"].count(".") return 0 # if override varbind OID, use value as agent address @@ -614,15 +632,15 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): return 0 if tds.first_varbind: - tds.all_vb_json_str = ', \"varbinds\": [' + tds.all_vb_json_str = ', "varbinds": [' tds.first_varbind = False else: tds.all_vb_json_str = tds.all_vb_json_str + " ," _individual_vb_dict.clear() - _individual_vb_dict['varbind_oid'] = _vb_oid - _individual_vb_dict['varbind_type'] = _vb_type - _individual_vb_dict['varbind_value'] = _vb_value + _individual_vb_dict["varbind_oid"] = _vb_oid + _individual_vb_dict["varbind_type"] = _vb_type + _individual_vb_dict["varbind_value"] = _vb_value _individual_vb_json_str = json.dumps(_individual_vb_dict) @@ -632,8 +650,7 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): # Callback function for receiving notifications # noinspection PyUnusedLocal,PyUnusedLocal,PyUnusedLocal -def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, - varBinds, cbCtx): +def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, varBinds, cbCtx): """ Callback executed when trap arrives :Parameters: @@ -673,10 +690,8 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, # iterate over varbinds, add to json struct for vb_oid, vb_val in varBinds: - log_ret = add_varbind_to_log_string( - pdu_varbind_count, vb_oid, vb_val.__class__.__name__, vb_val) - varbinds_added = add_varbind_to_json( - pdu_varbind_count, vb_oid, vb_val.__class__.__name__, vb_val) + log_ret = add_varbind_to_log_string(pdu_varbind_count, vb_oid, vb_val.__class__.__name__, vb_val) + varbinds_added = add_varbind_to_json(pdu_varbind_count, vb_oid, vb_val.__class__.__name__, vb_val) payload_varbinds += varbinds_added pdu_varbind_count += 1 @@ -686,7 +701,7 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, # if varbinds present - which will almost always be the case - add all_vb_json_str to trap_json_message if payload_varbinds != 0: # close out vb array - tds.all_vb_json_str = tds.all_vb_json_str + ']' + tds.all_vb_json_str = tds.all_vb_json_str + "]" # remove last close bracket from curr_trap_json_str curr_trap_json_str = curr_trap_json_str[:-1] @@ -695,7 +710,7 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, curr_trap_json_str = curr_trap_json_str + tds.all_vb_json_str # add last close brace back in - curr_trap_json_str = curr_trap_json_str + '}' + curr_trap_json_str = curr_trap_json_str + "}" msg = "trap %s : %s" % (tds.trap_dict["uuid"], curr_trap_json_str) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) @@ -710,38 +725,41 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, # only add to publish buffer if stormwatch is NOT active if stormwatch.sw_storm_active(tds.trap_dict["agent address"], tds.trap_dict["notify OID"]): msg = "stormwatch active - deflecting notification %s from %s" % ( - tds.trap_dict["notify OID"], tds.trap_dict["agent address"]) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + tds.trap_dict["notify OID"], + tds.trap_dict["agent address"], + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) else: msg = "adding %s to buffer" % (tds.trap_dict["uuid"]) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) if tds.first_trap: tds.all_traps_json_str = curr_trap_json_str tds.trap_uuids_in_buffer = tds.trap_dict["uuid"] tds.first_trap = False else: - tds.trap_uuids_in_buffer = tds.trap_uuids_in_buffer + \ - ', ' + tds.trap_dict["uuid"] - tds.all_traps_json_str = tds.all_traps_json_str + ', ' + curr_trap_json_str + tds.trap_uuids_in_buffer = tds.trap_uuids_in_buffer + ", " + tds.trap_dict["uuid"] + tds.all_traps_json_str = tds.all_traps_json_str + ", " + curr_trap_json_str - if tds.traps_since_last_publish >= int(tds.c_config['publisher']['max_traps_between_publishes']): + if tds.traps_since_last_publish >= int(tds.c_config["publisher"]["max_traps_between_publishes"]): msg = "num traps since last publish (%d) exceeds threshold (%d) - publish traps" % ( - tds.traps_since_last_publish, int(tds.c_config['publisher']['max_traps_between_publishes'])) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + tds.traps_since_last_publish, + int(tds.c_config["publisher"]["max_traps_between_publishes"]), + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) post_dmaap() - elif milliseconds_since_last_publish >= int(tds.c_config['publisher']['max_milliseconds_between_publishes']): - msg = "num milliseconds since last publish (%.0f) exceeds threshold - publish traps" % milliseconds_since_last_publish - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + elif milliseconds_since_last_publish >= int(tds.c_config["publisher"]["max_milliseconds_between_publishes"]): + msg = ( + "num milliseconds since last publish (%.0f) exceeds threshold - publish traps" + % milliseconds_since_last_publish + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) post_dmaap() else: - msg = "neither milliseconds_since_last_publish (%.0f) or traps_since_last_publish (%d) exceed threshold - continue" % ( - milliseconds_since_last_publish, tds.traps_since_last_publish) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, - tds.CODE_GENERAL, msg) + msg = ( + "neither milliseconds_since_last_publish (%.0f) or traps_since_last_publish (%d) exceed threshold - continue" + % (milliseconds_since_last_publish, tds.traps_since_last_publish) + ) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) # # # # # # # # # # # # # @@ -751,12 +769,9 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Post SNMP traps ' - 'to message bus') - parser.add_argument('-v', action="store_true", dest="verbose", - help="verbose logging") - parser.add_argument('-?', action="store_true", dest="usage_requested", - help="show command line use") + parser = argparse.ArgumentParser(description="Post SNMP traps " "to message bus") + parser.add_argument("-v", action="store_true", dest="verbose", help="verbose logging") + parser.add_argument("-?", action="store_true", dest="usage_requested", help="show command line use") # parse args args = parser.parse_args() @@ -784,7 +799,10 @@ if __name__ == "__main__": # get config binding service (CBS) values (either broker, or json file override) load_all_configs(0, 0) msg = "%s : %s version %s starting" % ( - prog_name, tds.c_config['snmptrapd']['title'], tds.c_config['snmptrapd']['version']) + prog_name, + tds.c_config["snmptrapd"]["title"], + tds.c_config["snmptrapd"]["version"], + ) stdout_logger(msg) # open various ecomp logs @@ -797,22 +815,33 @@ if __name__ == "__main__": stdout_logger(msg) # use specific flags or 'all' for full debugging # debug.setLogger(debug.Debug('dsp', 'msgproc')) - debug.setLogger(debug.Debug('all')) + debug.setLogger(debug.Debug("all")) # name and open arriving trap log - tds.arriving_traps_filename = tds.c_config['files']['runtime_base_dir'] + "/" + \ - tds.c_config['files']['log_dir'] + "/" + \ - (tds.c_config['files']['arriving_traps_log']) + tds.arriving_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + (tds.c_config["files"]["arriving_traps_log"]) + ) tds.arriving_traps_fd = open_file(tds.arriving_traps_filename) - msg = ("arriving traps logged to: %s" % tds.arriving_traps_filename) + msg = "arriving traps logged to: %s" % tds.arriving_traps_filename stdout_logger(msg) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) # name and open json trap log - tds.json_traps_filename = tds.c_config['files']['runtime_base_dir'] + "/" + tds.c_config['files']['log_dir'] + "/" + "DMAAP_" + ( - tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url'].split('/')[-1]) + ".json" + tds.json_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + "DMAAP_" + + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) + + ".json" + ) tds.json_traps_fd = open_file(tds.json_traps_filename) - msg = ("published traps logged to: %s" % tds.json_traps_filename) + msg = "published traps logged to: %s" % tds.json_traps_filename stdout_logger(msg) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) @@ -820,8 +849,9 @@ if __name__ == "__main__": signal.signal(signal.SIGUSR1, load_all_configs) # save current PID for future/external reference - tds.pid_file_name = tds.c_config['files']['runtime_base_dir'] + \ - '/' + tds.c_config['files']['pid_dir'] + '/' + prog_name + ".pid" + tds.pid_file_name = ( + tds.c_config["files"]["runtime_base_dir"] + "/" + tds.c_config["files"]["pid_dir"] + "/" + prog_name + ".pid" + ) msg = "Runtime PID file: %s" % tds.pid_file_name ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) rc = save_pid(tds.pid_file_name) @@ -839,8 +869,8 @@ if __name__ == "__main__": # UDP over IPv4 try: - ipv4_interface = tds.c_config['protocols']['ipv4_interface'] - ipv4_port = int(tds.c_config['protocols']['ipv4_port']) + ipv4_interface = tds.c_config["protocols"]["ipv4_interface"] + ipv4_port = int(tds.c_config["protocols"]["ipv4_port"]) try: # FIXME: this doesn't appear to throw an exception even if @@ -850,16 +880,11 @@ if __name__ == "__main__": # means to confirm proper privileges (then # close it and reopen w/ pysnmp api) config.addTransport( - snmp_engine, - udp.domainName + (1,), - udp.UdpTransport().openServerMode( - (ipv4_interface, ipv4_port)) + snmp_engine, udp.domainName + (1,), udp.UdpTransport().openServerMode((ipv4_interface, ipv4_port)) ) except Exception as e: - msg = "Unable to bind to %s:%d - %s" % ( - ipv4_interface, ipv4_port, str(e)) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, - tds.CODE_GENERAL, msg) + msg = "Unable to bind to %s:%d - %s" % (ipv4_interface, ipv4_port, str(e)) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -870,22 +895,17 @@ if __name__ == "__main__": # UDP over IPv6 try: - ipv6_interface = tds.c_config['protocols']['ipv6_interface'] - ipv6_port = int(tds.c_config['protocols']['ipv6_port']) + ipv6_interface = tds.c_config["protocols"]["ipv6_interface"] + ipv6_port = int(tds.c_config["protocols"]["ipv6_port"]) try: config.addTransport( - snmp_engine, - udp6.domainName, - udp6.Udp6Transport().openServerMode( - (ipv6_interface, ipv6_port)) + snmp_engine, udp6.domainName, udp6.Udp6Transport().openServerMode((ipv6_interface, ipv6_port)) ) except Exception as e: - msg = "Unable to bind to %s:%d - %s" % ( - ipv6_interface, ipv6_port, str(e)) + msg = "Unable to bind to %s:%d - %s" % (ipv6_interface, ipv6_port, str(e)) stdout_logger(msg) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, - tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg) cleanup_and_exit(1, tds.pid_file_name) except Exception as e: @@ -900,25 +920,21 @@ if __name__ == "__main__": # SecurityName <-> CommunityName mapping # to restrict trap reception to only those with specific community # strings - config.addV1System(snmp_engine, 'my-area', 'public') + config.addV1System(snmp_engine, "my-area", "public") # register comm_string_rewrite_observer for message arrival - snmp_engine.observer.registerObserver( - comm_string_rewrite_observer, - 'rfc2576.processIncomingMsg:writable' - ) + snmp_engine.observer.registerObserver(comm_string_rewrite_observer, "rfc2576.processIncomingMsg:writable") # # # # # # # # # # # # # SNMPv3 setup # # # # # # # # # # # # - config, snmp_engine = load_snmpv3_credentials( - config, snmp_engine, tds.c_config) + config, snmp_engine = load_snmpv3_credentials(config, snmp_engine, tds.c_config) # register snmp_engine_observer_cb for message arrival snmp_engine.observer.registerObserver( snmp_engine_observer_cb, - 'rfc3412.receiveMessage:request', - 'rfc3412.returnResponsePdu', + "rfc3412.receiveMessage:request", + "rfc3412.returnResponsePdu", ) # Register SNMP Application at the SNMP engine diff --git a/tests/__init__.py b/tests/__init__.py index af0fbb2..1d97a6a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,5 +1,5 @@ -# ================================================================================ -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# ============LICENSE_START======================================================= +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. # empty __init__.py so that pytest can add correct path to coverage report, -- per pytest diff --git a/tests/snmp.setup.py b/tests/snmp.setup.py index ed565f2..c5a533c 100644 --- a/tests/snmp.setup.py +++ b/tests/snmp.setup.py @@ -1,6 +1,5 @@ -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ============LICENSE_START======================================================= +# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. import argparse import array @@ -44,6 +41,7 @@ import uuid as uuid_mod from collections import Counter from onap_dcae_cbs_docker_client.client import get_config from pysnmp.carrier.asyncio.dgram import udp, udp6 + # from pysnmp.carrier.asyncore.dgram import udp from pysnmp.entity import engine, config from pysnmp.entity.rfc3413 import ntfrcv @@ -60,14 +58,14 @@ install_reqs = parse_requirements("requirements.txt", session=PipSession()) reqs = [str(ir.req) for ir in install_reqs] setup( - name = "dcaegen2-collectors-snmptrap", - description = "snmp trap receiver for ONAP docker image", - version = "1.4.0", + name="dcaegen2-collectors-snmptrap", + description="snmp trap receiver for ONAP docker image", + version="1.4.0", packages=find_packages(), - author = "Dave L", - author_email = "dl3158@att.com", - license='Apache 2', - keywords = "", - url = "", - install_requires=reqs + author="Dave L", + author_email="dl3158@att.com", + license="Apache 2", + keywords="", + url="", + install_requires=reqs, ) diff --git a/tests/test_snmptrapd.py b/tests/test_snmptrapd.py index cdd19a4..dee2aa0 100644 --- a/tests/test_snmptrapd.py +++ b/tests/test_snmptrapd.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,19 +31,19 @@ import trapd_get_cbs_config from pysnmp.hlapi import * from pysnmp import debug + class test_snmptrapd(unittest.TestCase): """ Test the save_pid mod """ - pytest_json_data = "{ \"snmptrapd\": { \"version\": \"2.0.3\", \"title\": \"ONAP SNMP Trap Receiver\" }, \"protocols\": { \"transport\": \"udp\", \"ipv4_interface\": \"0.0.0.0\", \"ipv4_port\": 6162, \"ipv6_interface\": \"::1\", \"ipv6_port\": 6162 }, \"cache\": { \"dns_cache_ttl_seconds\": 60 }, \"publisher\": { \"http_timeout_milliseconds\": 1500, \"http_retries\": 3, \"http_milliseconds_between_retries\": 750, \"http_primary_publisher\": \"true\", \"http_peer_publisher\": \"unavailable\", \"max_traps_between_publishes\": 10, \"max_milliseconds_between_publishes\": 10000 }, \"streams_publishes\": { \"sec_fault_unsecure\": { \"type\": \"message_router\", \"aaf_password\": null, \"dmaap_info\": { \"location\": \"mtl5\", \"client_id\": null, \"client_role\": null, \"topic_url\": \"http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP\" }, \"aaf_username\": null } }, \"files\": { \"runtime_base_dir\": \"/tmp/opt/app/snmptrap\", \"log_dir\": \"logs\", \"data_dir\": \"data\", \"pid_dir\": \"tmp\", \"arriving_traps_log\": \"snmptrapd_arriving_traps.log\", \"snmptrapd_diag\": \"snmptrapd_prog_diag.log\", \"traps_stats_log\": \"snmptrapd_stats.csv\", \"perm_status_file\": \"snmptrapd_status.log\", \"eelf_base_dir\": \"/tmp/opt/app/snmptrap/logs\", \"eelf_error\": \"error.log\", \"eelf_debug\": \"debug.log\", \"eelf_audit\": \"audit.log\", \"eelf_metrics\": \"metrics.log\", \"roll_frequency\": \"day\", \"minimum_severity_to_log\": 2 }, \"trap_config\": { \"sw_interval_in_seconds\": 60, \"notify_oids\": { \".1.3.6.1.4.1.9.0.1\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.2\": { \"sw_high_water_in_interval\": 101, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.3\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.4\": { \"sw_high_water_in_interval\": 10, \"sw_low_water_in_interval\": 3, \"category\": \"logonly\" } } }, \"snmpv3_config\": { \"usm_users\": [ { \"user\": \"usr-sha-aes256\", \"engineId\": \"8000000001020304\", \"usmHMACSHAAuth\": \"authkey1\", \"usmAesCfb256\": \"privkey1\" }, { \"user\": \"user1\", \"engineId\": \"8000000000000001\", \"usmHMACMD5Auth\": \"authkey1\", \"usmDESPriv\": \"privkey1\" }, { \"user\": \"user2\", \"engineId\": \"8000000000000002\", \"usmHMACSHAAuth\": \"authkey2\", \"usmAesCfb128\": \"privkey2\" }, { \"user\": \"user3\", \"engineId\": \"8000000000000003\", \"usmHMACSHAAuth\": \"authkey3\", \"usmAesCfb256\": \"privkey3\" } ] } }" + pytest_json_data = '{ "snmptrapd": { "version": "2.0.3", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' # create copy of snmptrapd.json for pytest pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" - with open(pytest_json_config, 'w') as outfile: + with open(pytest_json_config, "w") as outfile: outfile.write(pytest_json_data) - def test_usage_err(self): """ Test usage error @@ -54,7 +54,6 @@ class test_snmptrapd(unittest.TestCase): assert pytest_wrapped_sys_exit.type == SystemExit assert pytest_wrapped_sys_exit.value.code == 1 - def test_load_all_configs(self): """ Test load of all configs @@ -65,7 +64,7 @@ class test_snmptrapd(unittest.TestCase): sw.sw_init() # request load of CBS data - os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json') + os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") result = trapd_get_cbs_config.get_cbs_config() self.assertEqual(result, True) @@ -82,7 +81,7 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json') + os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") result = trapd_get_cbs_config.get_cbs_config() self.assertEqual(result, True) @@ -99,29 +98,54 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json') + os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") result = trapd_get_cbs_config.get_cbs_config() # set last day to current tds.last_day = datetime.datetime.now().day # trap dict for logging - tds.trap_dict = {'uuid': '06f6e91c-3236-11e8-9953-005056865aac', 'agent address': '1.2.3.4', 'agent name': 'test-agent.nodomain.com', 'cambria.partition': 'test-agent.nodomain.com', 'community': '', 'community len': 0, 'epoch_serno': 15222068260000, 'protocol version': 'v2c', 'time received': 1522206826.2938566, 'trap category': 'ONAP-COLLECTOR-SNMPTRAP', 'sysUptime': '218567736', 'notify OID': '1.3.6.1.4.1.9999.9.9.999', 'notify OID len': 10} + tds.trap_dict = { + "uuid": "06f6e91c-3236-11e8-9953-005056865aac", + "agent address": "1.2.3.4", + "agent name": "test-agent.nodomain.com", + "cambria.partition": "test-agent.nodomain.com", + "community": "", + "community len": 0, + "epoch_serno": 15222068260000, + "protocol version": "v2c", + "time received": 1522206826.2938566, + "trap category": "ONAP-COLLECTOR-SNMPTRAP", + "sysUptime": "218567736", + "notify OID": "1.3.6.1.4.1.9999.9.9.999", + "notify OID len": 10, + } # open eelf logs trapd_io.open_eelf_logs() # open trap logs - tds.arriving_traps_filename = tds.c_config['files']['runtime_base_dir'] + "/" + \ - tds.c_config['files']['log_dir'] + "/" + \ - (tds.c_config['files']['arriving_traps_log']) + tds.arriving_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + (tds.c_config["files"]["arriving_traps_log"]) + ) tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename) # name and open json trap log - tds.json_traps_filename = tds.c_config['files']['runtime_base_dir'] + "/" + tds.c_config['files']['log_dir'] + "/" + "DMAAP_" + ( - tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url'].split('/')[-1]) + ".json" + tds.json_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + "DMAAP_" + + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) + + ".json" + ) tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename) - msg = ("published traps logged to: %s" % tds.json_traps_filename) + msg = "published traps logged to: %s" % tds.json_traps_filename trapd_io.stdout_logger(msg) trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) @@ -139,7 +163,7 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json') + os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") trapd_get_cbs_config.get_cbs_config() # open eelf logs @@ -154,20 +178,26 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json') + os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") trapd_get_cbs_config.get_cbs_config() - errorIndication, errorStatus, errorIndex, varbinds = next(sendNotification(SnmpEngine(), - CommunityData('not_public'), - UdpTransportTarget(('localhost', 6162)), - ContextData(), - 'trap', - [ObjectType(ObjectIdentity('.1.3.6.1.4.1.999.1'), OctetString('test trap - ignore')), - ObjectType(ObjectIdentity('.1.3.6.1.4.1.999.2'), OctetString('ONAP pytest trap'))]) + errorIndication, errorStatus, errorIndex, varbinds = next( + sendNotification( + SnmpEngine(), + CommunityData("not_public"), + UdpTransportTarget(("localhost", 6162)), + ContextData(), + "trap", + [ + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), + ], + ) ) result = errorIndication self.assertEqual(result, None) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/tests/test_snmptrapd_send_test_trap.py b/tests/test_snmptrapd_send_test_trap.py index 0e23def..8158629 100755 --- a/tests/test_snmptrapd_send_test_trap.py +++ b/tests/test_snmptrapd_send_test_trap.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,32 +21,37 @@ from pysnmp import debug iters = range(0, 10, 1) for i in iters: - errorIndication, errorStatus, errorIndex, varbinds = next(sendNotification(SnmpEngine(), - CommunityData('not_public'), - UdpTransportTarget(('localhost', 6164)), - ContextData(), - 'trap', - [ObjectType(ObjectIdentity('.1.3.6.1.4.1.999.1'), OctetString('test trap - ignore')), - ObjectType(ObjectIdentity('.1.3.6.1.4.1.999.2'), OctetString('ONAP pytest trap'))]) + errorIndication, errorStatus, errorIndex, varbinds = next( + sendNotification( + SnmpEngine(), + CommunityData("not_public"), + UdpTransportTarget(("localhost", 6164)), + ContextData(), + "trap", + [ + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), + ], + ) ) - + if errorIndication: print(errorIndication) else: print("successfully sent first trap example, number %d" % i) for i in iters: - errorIndication, errorStatus, errorIndex, varbinds = next(sendNotification(SnmpEngine(), - CommunityData('public'), - UdpTransportTarget(('localhost', 6164)), - ContextData(), - 'trap', - NotificationType( - ObjectIdentity('.1.3.6.1.4.1.74.2.46.12.1.1') - ).addVarBinds( - ('.1.3.6.1.4.1.999.1', OctetString('ONAP pytest trap - ignore (varbind 1)')), - ('.1.3.6.1.4.1.999.2', OctetString('ONAP pytest trap - ignore (varbind 2)')) - ) + errorIndication, errorStatus, errorIndex, varbinds = next( + sendNotification( + SnmpEngine(), + CommunityData("public"), + UdpTransportTarget(("localhost", 6164)), + ContextData(), + "trap", + NotificationType(ObjectIdentity(".1.3.6.1.4.1.74.2.46.12.1.1")).addVarBinds( + (".1.3.6.1.4.1.999.1", OctetString("ONAP pytest trap - ignore (varbind 1)")), + (".1.3.6.1.4.1.999.2", OctetString("ONAP pytest trap - ignore (varbind 2)")), + ), ) ) diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py index 3bbc386..0e38461 100644 --- a/tests/test_trapd_exit.py +++ b/tests/test_trapd_exit.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,36 +18,38 @@ import pytest import unittest import trapd_exit -pid_file="/tmp/test_pid_file" -pid_file_dne="/tmp/test_pid_file_NOT" - +pid_file = "/tmp/test_pid_file" +pid_file_dne = "/tmp/test_pid_file_NOT" + + class test_cleanup_and_exit(unittest.TestCase): """ Test the cleanup_and_exit mod """ - + def test_normal_exit(self): """ Test normal exit works as expected """ - open(pid_file, 'w') - + open(pid_file, "w") + with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - result = trapd_exit.cleanup_and_exit(0,pid_file) + result = trapd_exit.cleanup_and_exit(0, pid_file) assert pytest_wrapped_sys_exit.type == SystemExit assert pytest_wrapped_sys_exit.value.code == 0 # compare = str(result).startswith("SystemExit: 0") # self.assertEqual(compare, True) - + def test_abnormal_exit(self): """ Test exit with missing PID file exits non-zero """ with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - result = trapd_exit.cleanup_and_exit(0,pid_file_dne) + result = trapd_exit.cleanup_and_exit(0, pid_file_dne) assert pytest_wrapped_sys_exit.type == SystemExit assert pytest_wrapped_sys_exit.value.code == 1 -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py index 67bccfc..b7c5d7b 100644 --- a/tests/test_trapd_get_cbs_config.py +++ b/tests/test_trapd_get_cbs_config.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -41,16 +41,16 @@ except Exception as e: # env var for CBS_SIM_JSON try: - os.environ['CBS_SIM_JSON'] = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" + os.environ["CBS_SIM_JSON"] = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" except Exception as e: print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) sys.exit(1) -pytest_json_data = "{ \"snmptrapd\": { \"version\": \"1.4.0\", \"title\": \"ONAP SNMP Trap Receiver\" }, \"protocols\": { \"transport\": \"udp\", \"ipv4_interface\": \"0.0.0.0\", \"ipv4_port\": 6162, \"ipv6_interface\": \"::1\", \"ipv6_port\": 6162 }, \"cache\": { \"dns_cache_ttl_seconds\": 60 }, \"publisher\": { \"http_timeout_milliseconds\": 1500, \"http_retries\": 3, \"http_milliseconds_between_retries\": 750, \"http_primary_publisher\": \"true\", \"http_peer_publisher\": \"unavailable\", \"max_traps_between_publishes\": 10, \"max_milliseconds_between_publishes\": 10000 }, \"streams_publishes\": { \"sec_fault_unsecure\": { \"type\": \"message_router\", \"aaf_password\": null, \"dmaap_info\": { \"location\": \"mtl5\", \"client_id\": null, \"client_role\": null, \"topic_url\": \"http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP\" }, \"aaf_username\": null } }, \"files\": { \"runtime_base_dir\": \"/tmp/opt/app/snmptrap\", \"log_dir\": \"logs\", \"data_dir\": \"data\", \"pid_dir\": \"tmp\", \"arriving_traps_log\": \"snmptrapd_arriving_traps.log\", \"snmptrapd_diag\": \"snmptrapd_prog_diag.log\", \"traps_stats_log\": \"snmptrapd_stats.csv\", \"perm_status_file\": \"snmptrapd_status.log\", \"eelf_base_dir\": \"/tmp/opt/app/snmptrap/logs\", \"eelf_error\": \"error.log\", \"eelf_debug\": \"debug.log\", \"eelf_audit\": \"audit.log\", \"eelf_metrics\": \"metrics.log\", \"roll_frequency\": \"day\", \"minimum_severity_to_log\": 2 }, \"trap_config\": { \"sw_interval_in_seconds\": 60, \"notify_oids\": { \".1.3.6.1.4.1.9.0.1\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.2\": { \"sw_high_water_in_interval\": 101, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.3\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.4\": { \"sw_high_water_in_interval\": 10, \"sw_low_water_in_interval\": 3, \"category\": \"logonly\" } } }, \"snmpv3_config\": { \"usm_users\": [ { \"user\": \"usr-sha-aes256\", \"engineId\": \"8000000001020304\", \"usmHMACSHAAuth\": \"authkey1\", \"usmAesCfb256\": \"privkey1\" }, { \"user\": \"user1\", \"engineId\": \"8000000000000001\", \"usmHMACMD5Auth\": \"authkey1\", \"usmDESPriv\": \"privkey1\" }, { \"user\": \"user2\", \"engineId\": \"8000000000000002\", \"usmHMACSHAAuth\": \"authkey2\", \"usmAesCfb128\": \"privkey2\" }, { \"user\": \"user3\", \"engineId\": \"8000000000000003\", \"usmHMACSHAAuth\": \"authkey3\", \"usmAesCfb256\": \"privkey3\" } ] } }" +pytest_json_data = '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' # create snmptrapd.json for pytest -pytest_json_config = os.getenv('CBS_SIM_JSON') -with open(pytest_json_config, 'w') as outfile: +pytest_json_config = os.getenv("CBS_SIM_JSON") +with open(pytest_json_config, "w") as outfile: outfile.write(pytest_json_data) outfile.close() @@ -65,7 +65,7 @@ class test_get_cbs_config(unittest.TestCase): Test that CBS fallback env variable exists and we can get config from fallback env var """ - os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json') + os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") result = trapd_get_cbs_config.get_cbs_config() print("result: %s" % result) # compare = str(result).startswith("{'snmptrap': ") @@ -73,9 +73,8 @@ class test_get_cbs_config(unittest.TestCase): self.assertEqual(result, True) def test_cbs_override_env_invalid(self): - """ - """ - os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/nosuchfile.json') + """ """ + os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/nosuchfile.json") # result = trapd_get_cbs_config.get_cbs_config() # print("result: %s" % result) # compare = str(result).startswith("{'snmptrap': ") @@ -85,14 +84,14 @@ class test_get_cbs_config(unittest.TestCase): result = trapd_get_cbs_config.get_cbs_config() assert pytest_wrapped_sys_exit.type == SystemExit # assert pytest_wrapped_sys_exit.value.code == 1 - + def test_cbs_env_present(self): """ Test that CONSUL_HOST env variable exists but fails to respond """ - os.environ.update(CONSUL_HOST='localhost') - del os.environ['CBS_SIM_JSON'] + os.environ.update(CONSUL_HOST="localhost") + del os.environ["CBS_SIM_JSON"] # result = trapd_get_cbs_config.get_cbs_config() # print("result: %s" % result) # compare = str(result).startswith("{'snmptrap': ") @@ -100,16 +99,16 @@ class test_get_cbs_config(unittest.TestCase): with pytest.raises(SystemExit) as sys_exit: trapd_get_cbs_config.get_cbs_config() - assert sys_exit.value.errno == errno.ECONNREFUSED - + assert sys_exit.value.errno == errno.ECONNREFUSED + def test_cbs_override_env_undefined(self): - """ - """ + """ """ print("------>>> RUNNING test_no_cbs_override_env_var:") - del os.environ['CBS_SIM_JSON'] - + del os.environ["CBS_SIM_JSON"] + with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: assert trapd_get_cbs_config.get_cbs_config() == SystemExit - -if __name__ == '__main__': + + +if __name__ == "__main__": unittest.main() diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py index 478b29d..7ec89a6 100644 --- a/tests/test_trapd_http_session.py +++ b/tests/test_trapd_http_session.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,20 +17,21 @@ import pytest import unittest import trapd_http_session - + + class test_init_session_obj(unittest.TestCase): """ Test the init_session_obj mod """ - + def close_nonexisting_session(self): """ test close of existing http session """ - sess="no session" + sess = "no session" result = trapd_http_session.close_session_obj(sess) self.assertEqual(result, True) - + def init_session(self): """ test creation of http session @@ -38,7 +39,7 @@ class test_init_session_obj(unittest.TestCase): result = trapd_http_session.init_session_obj() compare = str(result).startswith("