diff options
29 files changed, 2010 insertions, 650 deletions
diff --git a/Changelog.md b/Changelog.md index 78b3a8a..c102c46 100644 --- a/Changelog.md +++ b/Changelog.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [2.0.7] - 2022/08/17 +- [DCAEGEN2-3158] CodeCoverage improvement for dcaegen2-collectors-snmptrap (60% to 90%) + ## [2.0.6] - 2021/10/26 ### Changed * [DCAEGEN2-2957] SNMP Trap collector - STDOUT complaince @@ -1,3 +1,4 @@ runtests: - . ~/bin/set_proxies; tox tests | cat - coverage html + -[ -d /tmp/opt ] && find /tmp/opt -type d -exec chmod 755 {} + + rm -rf /tmp/opt + tox tests | cat @@ -1,7 +1,7 @@ <?xml version="1.0"?> <!-- -================================================================================ -Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +============LICENSE_START======================================================= +Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. <groupId>org.onap.dcaegen2.collectors</groupId> <artifactId>snmptrap</artifactId> <name>dcaegen2-collectors-snmptrap</name> - <version>2.0.6-SNAPSHOT</version> + <version>2.0.7-SNAPSHOT</version> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2021 Deutsche Telekom. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,7 +25,7 @@ from setuptools import setup, find_packages setup( name="snmptrap", description="snmp trap receiver for ONAP docker image", - version="2.0.6", + version="2.0.7", packages=find_packages(), install_requires=["pysnmp==4.4.12", "requests==2.18.3", "onap_dcae_cbs_docker_client==2.2.1", "pyyaml"], author="Dave L", diff --git a/snmptrap/makefile b/snmptrap/makefile new file mode 100644 index 0000000..aa9d159 --- /dev/null +++ b/snmptrap/makefile @@ -0,0 +1,2 @@ +runtests: + cd .. && $(MAKE) runtests diff --git a/snmptrap/mod/trapd_exit.py b/snmptrap/mod/trapd_exit.py index 1d1ea16..0959399 100644 --- a/snmptrap/mod/trapd_exit.py +++ b/snmptrap/mod/trapd_exit.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -46,13 +46,8 @@ def cleanup_and_exit(_loc_exit_code, _pid_file_name): none :Keywords: runtime PID exit - :Variables: - _num_params - number of parameters passed to module """ - # _num_params = len(locals()) - if _pid_file_name is not None: rc = rm_pid(_pid_file_name) sys.exit(_loc_exit_code) diff --git a/snmptrap/mod/trapd_get_cbs_config.py b/snmptrap/mod/trapd_get_cbs_config.py index cfae994..9ed7916 100644 --- a/snmptrap/mod/trapd_get_cbs_config.py +++ b/snmptrap/mod/trapd_get_cbs_config.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,28 +64,23 @@ def get_cbs_config(): msg = "ONAP controller not present, trying json config override via CBS_SIM_JSON env variable" stdout_logger(msg) - try: - _cbs_sim_json_file = os.getenv("CBS_SIM_JSON", "None") - except Exception as e: + _cbs_sim_json_file = os.getenv("CBS_SIM_JSON") + + if _cbs_sim_json_file is None: msg = "CBS_SIM_JSON not defined - FATAL ERROR, exiting" stdout_logger(msg) cleanup_and_exit(1, None) - if _cbs_sim_json_file == "None": - msg = "CBS_SIM_JSON not defined - FATAL ERROR, exiting" + msg = "ONAP controller override specified via CBS_SIM_JSON: %s" % _cbs_sim_json_file + stdout_logger(msg) + try: + tds.c_config = json.load(open(_cbs_sim_json_file)) + msg = "%s loaded and parsed successfully" % _cbs_sim_json_file stdout_logger(msg) - cleanup_and_exit(1, None) - else: - msg = "ONAP controller override specified via CBS_SIM_JSON: %s" % _cbs_sim_json_file + except Exception as e: + msg = "Unable to load CBS_SIM_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting" stdout_logger(msg) - try: - tds.c_config = json.load(open(_cbs_sim_json_file)) - msg = "%s loaded and parsed successfully" % _cbs_sim_json_file - stdout_logger(msg) - except Exception as e: - msg = "Unable to load CBS_SIM_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting" - stdout_logger(msg) - cleanup_and_exit(1, None) + cleanup_and_exit(1, None) # display consul config returned, regardless of source msg = "cbs config: %s" % json.dumps(tds.c_config) diff --git a/snmptrap/mod/trapd_http_session.py b/snmptrap/mod/trapd_http_session.py index 7118d00..1c93edc 100644 --- a/snmptrap/mod/trapd_http_session.py +++ b/snmptrap/mod/trapd_http_session.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,6 +23,9 @@ __docformat__ = "restructuredtext" import os import requests import traceback +from trapd_io import ecomp_logger, stdout_logger +import trapd_settings as tds +from trapd_exit import cleanup_and_exit prog_name = os.path.basename(__file__) diff --git a/snmptrap/mod/trapd_io.py b/snmptrap/mod/trapd_io.py index f7c48b7..2ba44b4 100644 --- a/snmptrap/mod/trapd_io.py +++ b/snmptrap/mod/trapd_io.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -54,15 +54,19 @@ def roll_all_logs(): # NOTE: this will go away when onap logging is standardized/available try: # open various ecomp logs - if any fails, exit - for fd in [ - tds.eelf_error_fd, - tds.eelf_debug_fd, - tds.eelf_audit_fd, - tds.eelf_metrics_fd, - tds.arriving_traps_fd, - tds.json_traps_fd, + for fd, nm in [ + (tds.eelf_error_fd, "error"), + (tds.eelf_debug_fd, "debug"), + (tds.eelf_audit_fd, "eelf"), + (tds.eelf_metrics_fd, "metrics"), + (tds.arriving_traps_fd, "arriving_traps"), + (tds.json_traps_fd, "json_traps"), ]: - fd.close() + if fd is None: + msg = "Error closing log file for " + nm + stdout_logger(msg) + else: + fd.close() roll_file(tds.eelf_error_file_name) roll_file(tds.eelf_debug_file_name) @@ -86,7 +90,7 @@ def roll_all_logs(): try: tds.json_traps_fd = open_file(tds.json_traps_filename) except Exception as e: - msg = "Error opening json_log %s : %s" % (json_traps_filename, str(e)) + msg = "Error opening json_log %s : %s" % (tds.json_traps_filename, str(e)) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -96,7 +100,7 @@ def roll_all_logs(): try: tds.arriving_traps_fd = open_file(tds.arriving_traps_filename) except Exception as e: - msg = "Error opening arriving traps %s : %s" % (arriving_traps_filename, str(e)) + msg = "Error opening arriving traps %s : %s" % (tds.arriving_traps_filename, str(e)) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -119,6 +123,7 @@ def open_eelf_logs(): tds.eelf_error_fd = open_file(tds.eelf_error_file_name) except Exception as e: + # here if we cannot create the filename msg = "Error opening eelf error log : " + str(e) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -128,6 +133,7 @@ def open_eelf_logs(): tds.eelf_debug_fd = open_file(tds.eelf_debug_file_name) except Exception as e: + # here if we cannot create the filename msg = "Error opening eelf debug log : " + str(e) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -136,6 +142,7 @@ def open_eelf_logs(): tds.eelf_audit_file_name = tds.c_config["files"]["eelf_base_dir"] + "/" + tds.c_config["files"]["eelf_audit"] tds.eelf_audit_fd = open_file(tds.eelf_audit_file_name) except Exception as e: + # here if we cannot create the filename msg = "Error opening eelf audit log : " + str(e) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -146,6 +153,7 @@ def open_eelf_logs(): ) tds.eelf_metrics_fd = open_file(tds.eelf_metrics_file_name) except Exception as e: + # here if we cannot create the filename msg = "Error opening eelf metric log : " + str(e) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) @@ -202,15 +210,14 @@ def open_file(_loc_file_name): stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) - # # # # # # # # # # # # # - # fx: close_file - # # # # # # # # # # # # # - """ - close _loc_file_name, return True with success, False otherwise - """ + +# # # # # # # # # # # # # +# fx: close_file +# # # # # # # # # # # # # def close_file(_loc_fd, _loc_filename): + """ close _loc_file_name, return True with success, False otherwise """ try: _loc_fd.close() diff --git a/snmptrap/mod/trapd_runtime_pid.py b/snmptrap/mod/trapd_runtime_pid.py index c647885..0a9eac6 100644 --- a/snmptrap/mod/trapd_runtime_pid.py +++ b/snmptrap/mod/trapd_runtime_pid.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -53,11 +53,7 @@ def save_pid(_pid_file_name): except IOError: print("IOError saving PID file %s :" % _pid_file_name) return False - # except: - # print("Error saving PID file %s :" % _pid_file_name) - # return False else: - # print("Runtime PID file: %s" % _pid_file_name) return True @@ -76,7 +72,6 @@ def rm_pid(_pid_file_name): :Keywords: pid /var/run """ - try: if os.path.isfile(_pid_file_name): os.remove(_pid_file_name) @@ -84,6 +79,6 @@ def rm_pid(_pid_file_name): else: return False - except IOError: + except: print("Error removing Runtime PID file: %s" % _pid_file_name) return False diff --git a/snmptrap/mod/trapd_settings.py b/snmptrap/mod/trapd_settings.py index 26e7c9b..33cf56b 100644 --- a/snmptrap/mod/trapd_settings.py +++ b/snmptrap/mod/trapd_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -114,7 +114,7 @@ def init(): global json_traps_filename json_traps_filename = "" global json_traps_fd - json_fd = None + json_traps_fd = None # </json log of traps published> # <log of arriving traps > diff --git a/snmptrap/mod/trapd_snmpv3.py b/snmptrap/mod/trapd_snmpv3.py index 56ecb5c..1a1cc0c 100644 --- a/snmptrap/mod/trapd_snmpv3.py +++ b/snmptrap/mod/trapd_snmpv3.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -82,8 +82,6 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config): # find options at -> site-packages/pysnmp/entity/config.py # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - # print("Checking auth for %s" % (userName)) - # usmHMACMD5AuthProtocol try: authKey = v3_user["usmHMACMD5AuthProtocol"] @@ -123,8 +121,6 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config): # find options at -> site-packages/pysnmp/entity/config.py # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - # print("Checking priv for %s" % (userName)) - # usm3DESEDEPriv try: privKey = v3_user["usm3DESEDEPrivProtocol"] diff --git a/snmptrap/mod/trapd_stormwatch.py b/snmptrap/mod/trapd_stormwatch.py index 4c374bb..d0ff5a7 100644 --- a/snmptrap/mod/trapd_stormwatch.py +++ b/snmptrap/mod/trapd_stormwatch.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -65,8 +65,8 @@ def sw_init(): # # # # # # # # # # # # # -# fx: sw_storm_active -# - check if storm is active for agent/oid +# fx: sw_clear_dicts +# - clear out all dictionaries in stats # - returns True if yes, False if no # # # # # # # # # # # # # def sw_clear_dicts(): @@ -97,7 +97,9 @@ def sw_clear_dicts(): if hasattr(sws, "sw_config_category"): sws.sw_config_category.clear() return True + except Exception as e: + print(f">>>> got exception {e}") msg = "unable to reset stormwatch dictionaries - results will be indeterminate: %s" % (e) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) return False @@ -123,7 +125,8 @@ def sw_load_trap_config(_config): ret = sw_clear_dicts() msg = "reset existing sws dictionaries to empty" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - except NameError: + + except (NameError, AttributeError): msg = "sws dictionaries not present - initializing" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) ret = sw_init() @@ -138,6 +141,7 @@ def sw_load_trap_config(_config): ) msg = "metric_log_notification_threshold_pct value: %d" % stats.metric_log_notification_threshold_pct ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + except Exception as e: msg = "metric_log_notification_threshold_pct not present in config - default to 25" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) @@ -148,6 +152,7 @@ def sw_load_trap_config(_config): sws.sw_interval_in_seconds = int(_config["trap_config"]["sw_interval_in_seconds"]) msg = "sw_interval_in_seconds value: %d" % sws.sw_interval_in_seconds ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + except Exception as e: msg = "sw_interval_in_seconds not present in config - default to 60 seconds" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) @@ -156,16 +161,18 @@ def sw_load_trap_config(_config): # add trap configs from CBS json structure to running config try: notify_oids = _config["trap_config"]["notify_oids"] + except Exception as e: msg = "no trap_config or notify_oids defined" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) - return False + return 0 trap_block_counter = 0 for trap_block in notify_oids: # oid try: _oid = trap_block["oid"] + except Exception as e: msg = "missing oid value in notify_oids - oid section of CBS config - using empty value, disregard entry" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) @@ -174,6 +181,7 @@ def sw_load_trap_config(_config): # sw_high_water_in_interval try: _sw_high_water_in_interval = int(trap_block["sw_high_water_in_interval"]) + except Exception as e: msg = ( "missing sw_high_water_in_interval value in notify_oids - oid section of CBS config - using empty value" @@ -184,6 +192,7 @@ def sw_load_trap_config(_config): # sw_low_water_in_interval try: _sw_low_water_in_interval = int(trap_block["sw_low_water_in_interval"]) + except Exception as e: msg = ( "missing sw_low_water_in_interval value in notify_oids - oid section of CBS config - using empty value" @@ -194,6 +203,7 @@ def sw_load_trap_config(_config): # category try: _category = trap_block["category"] + except Exception as e: msg = "missing category value in notify_oids - oid section of CBS config - using empty value" ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg) diff --git a/snmptrap/scheduler.sh b/snmptrap/scheduler.sh index bb2d083..bf989b9 100755 --- a/snmptrap/scheduler.sh +++ b/snmptrap/scheduler.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash # ============LICENSE_START======================================================= -# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -56,10 +56,10 @@ _verbosity=$2 _log_message=$3 echo "`date` `date +%s` ${_fx} ${_error_code} ${_log_message}" >> ${log_fd} - + # log_lines=`wc -l ${log_fd} | awk {'print $1'} | bc` # log_lines=$((${log_lines}+1)) - + # if [ ${log_lines} -ge ${max_log_lines} ] # then # if [ -f ${log_fd} ] @@ -178,9 +178,9 @@ do run_daily_jobs # reset last_day last_day=${current_day} - fi - fi - fi + fi + fi + fi fi sleep ${sleep_time} done diff --git a/snmptrap/snmptrapd.py b/snmptrap/snmptrapd.py index ddb6b32..d8d86e5 100644 --- a/snmptrap/snmptrapd.py +++ b/snmptrap/snmptrapd.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -167,6 +167,7 @@ def load_all_configs(_signum, _frame): def resolve_ip(_loc_ip_addr_str): try: + if int(tds.dns_cache_ip_expires[_loc_ip_addr_str] < int(time.time())): raise Exception( "cache expired for %s at %d - updating value" @@ -515,20 +516,21 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): # tds.trap_dict["notify OID"] = "." + str(enterprise) + ".0." + str(specific_trap) # tds.trap_dict["notify OID len"] = tds.trap_dict["notify OID"].count('.') # tds.trap_dict["sysUptime"] = variables['pdu']['time-stamp'].prettyPrint() + + elif snmp_version == 2: + tds.trap_dict["protocol version"] = "v2c" + + elif snmp_version == 3: + tds.trap_dict["protocol version"] = "v3" + # tds.trap_dict["security level"] = str(variables['securityLevel']) + # tds.trap_dict["context name"] = str( + # variables['contextName'].prettyPrint()) + # tds.trap_dict["security name"] = str(variables['securityName']) + # tds.trap_dict["security engine"] = str( + # variables['contextEngineId'].prettyPrint()) + else: - if snmp_version == 2: - tds.trap_dict["protocol version"] = "v2c" - else: - if snmp_version == 3: - tds.trap_dict["protocol version"] = "v3" - # tds.trap_dict["security level"] = str(variables['securityLevel']) - # tds.trap_dict["context name"] = str( - # variables['contextName'].prettyPrint()) - # tds.trap_dict["security name"] = str(variables['securityName']) - # tds.trap_dict["security engine"] = str( - # variables['contextEngineId'].prettyPrint()) - else: - tds.trap_dict["protocol version"] = "unknown" + tds.trap_dict["protocol version"] = "unknown" # tds.trap_dict['time received'] = epoch_msecond tds.trap_dict["time received"] = epoch_second @@ -672,15 +674,6 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, msg = "processing varbinds for %s" % (tds.trap_dict["uuid"]) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - # help(snmp_engine) - # print(snmp_engine) - # help(varBinds) - # print(varBinds) - # help(cbCtx) - # print(cbCtx) - # for key, val in cbCtx: - # print(key, val) - # FMDL update reset location when batching publishes pdu_varbind_count = 0 payload_varbinds = 0 @@ -735,6 +728,7 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, if tds.first_trap: tds.all_traps_json_str = curr_trap_json_str tds.trap_uuids_in_buffer = tds.trap_dict["uuid"] + tds.first_trap = False else: tds.trap_uuids_in_buffer = tds.trap_uuids_in_buffer + ", " + tds.trap_dict["uuid"] diff --git a/snmptrap/snmptrapd.sh b/snmptrap/snmptrapd.sh index 717f660..6755c0c 100755 --- a/snmptrap/snmptrapd.sh +++ b/snmptrap/snmptrapd.sh @@ -2,7 +2,7 @@ # -*- indent-tabs-mode: nil -*- # vi: set expandtab: # # ============LICENSE_START======================================================= -# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -66,7 +66,7 @@ fi export PYTHONPATH=${bin_base_dir}/mod:$PYTHONPATH # PYTHONUNBUFFERED: -# set PYTHONUNBUFFERED to a non-empty string to avoid output buffering; +# set PYTHONUNBUFFERED to a non-empty string to avoid output buffering; # comment out for runtime environments/better performance! # export PYTHONUNBUFFERED="True" @@ -77,14 +77,14 @@ export CBS_SIM_JSON=${base_dir}/etc/snmptrapd.json # misc exit_after=1 -# # # # # # # # # # +# # # # # # # # # # # log_msg - log messages to stdout in standard manner -# # # # # # # # # # +# # # # # # # # # # log_msg() { msg=$* - echo "`date +%Y-%m-%dT%H:%M:%S,%N | cut -c1-23` ${msg}" + echo "`date +%Y-%m-%dT%H:%M:%S,%N | cut -c1-23` ${msg}" } # @@ -96,7 +96,7 @@ process_name=$1 pid_file=$2 exec_cmd=$3 - # check if exec_cmd has a pid_file + # check if exec_cmd has a pid_file if [ ! -r ${pid_file} ] then log_msg "Starting ${process_name}" @@ -123,13 +123,13 @@ exec_cmd=$3 fi } -# # # # # # # # # # +# # # # # # # # # # # Start the service -# # # # # # # # # # +# # # # # # # # # # start_service() { # Hints for startup modifications: - # _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + # _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ # handy for debug (e.g. docker logs output) # log_msg "Runtime env present for ${current_module} placed in ${base_dir}/logs/${current_module}.out" @@ -165,9 +165,9 @@ start_service() fi } -# # # # # # # # # # +# # # # # # # # # # # Stop the service -# # # # # # # # # # +# # # # # # # # # # stop_process() { process_name=$1 @@ -337,7 +337,7 @@ case "$1" in status_service wait ;; - "stop") + "stop") version stop_service ;; @@ -348,7 +348,7 @@ case "$1" in start_service status_service ;; - "status") + "status") version status_service ;; diff --git a/tests/test_snmptrapd.py b/tests/test_snmptrapd.py index dee2aa0..736e114 100644 --- a/tests/test_snmptrapd.py +++ b/tests/test_snmptrapd.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,17 @@ # limitations under the License. # ============LICENSE_END========================================================= +import copy +import datetime import os -import pytest import unittest +from pathlib import Path +import time +from unittest.mock import patch, Mock + +import requests + import snmptrapd -import datetime import trapd_settings as tds import trapd_stormwatch_settings as sws @@ -37,40 +43,207 @@ class test_snmptrapd(unittest.TestCase): Test the save_pid mod """ - pytest_json_data = '{ "snmptrapd": { "version": "2.0.3", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' + class WriteThrows(): + def write(): + raise RuntimeError("write() throws") + + + @classmethod + def setUpClass(cls): + + # init vars + tds.init() + sw.sw_init() + + # fmt: off + test_snmptrapd.pytest_empty_data = "{}" + test_snmptrapd.pytest_json_data = ( + '{ "snmptrapd": { ' + ' "version": "2.0.3", ' + ' "title": "ONAP SNMP Trap Receiver" }, ' + ' "protocols": { ' + ' "transport": "udp", ' + ' "ipv4_interface": "0.0.0.0", ' + ' "ipv4_port": 6162, ' + ' "ipv6_interface": "::1", ' + ' "ipv6_port": 6162 }, ' + ' "cache": { ' + ' "dns_cache_ttl_seconds": 60 }, ' + ' "publisher": { ' + ' "http_timeout_milliseconds": 1500, ' + ' "http_retries": 3, ' + ' "http_milliseconds_between_retries": 750, ' + ' "http_primary_publisher": "true", ' + ' "http_peer_publisher": "unavailable", ' + ' "max_traps_between_publishes": 10, ' + ' "max_milliseconds_between_publishes": 10000 }, ' + ' "streams_publishes": { ' + ' "sec_fault_unsecure": { ' + ' "type": "message_router", ' + ' "aaf_password": null, ' + ' "dmaap_info": { ' + ' "location": "mtl5", ' + ' "client_id": null, ' + ' "client_role": null, ' + ' "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' + ' "aaf_username": null } }, ' + ' "files": { ' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' + ' "log_dir": "logs", ' + ' "data_dir": "data", ' + ' "pid_dir": "tmp", ' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' + ' "traps_stats_log": "snmptrapd_stats.csv", ' + ' "perm_status_file": "snmptrapd_status.log", ' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' + ' "eelf_error": "error.log", ' + ' "eelf_debug": "debug.log", ' + ' "eelf_audit": "audit.log", ' + ' "eelf_metrics": "metrics.log", ' + ' "roll_frequency": "day", ' + ' "minimum_severity_to_log": 2 }, ' + ' "trap_config": { ' + ' "sw_interval_in_seconds": 60, ' + ' "notify_oids": { ' + ' ".1.3.6.1.4.1.9.0.1": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.2": { ' + ' "sw_high_water_in_interval": 101, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.3": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.4": { ' + ' "sw_high_water_in_interval": 10, ' + ' "sw_low_water_in_interval": 3, ' + ' "category": "logonly" } } }, ' + ' "snmpv3_config": { ' + ' "usm_users": [ { ' + ' "user": "usr-sha-aes256", ' + ' "engineId": "8000000001020304", ' + ' "usmHMACSHAAuth": "authkey1", ' + ' "usmAesCfb256": "privkey1" }, ' + ' { "user": "user1", ' + ' "engineId": "8000000000000001", ' + ' "usmHMACMD5Auth": "authkey1", ' + ' "usmDESPriv": "privkey1" }, ' + ' { "user": "user2", ' + ' "engineId": "8000000000000002", ' + ' "usmHMACSHAAuth": "authkey2", ' + ' "usmAesCfb128": "privkey2" }, ' + ' { "user": "user3", ' + ' "engineId": "8000000000000003", ' + ' "usmHMACSHAAuth": "authkey3", ' + ' "usmAesCfb256": "privkey3" } ' + ' ] } }' + ) + # fmt: off + + test_snmptrapd.trap_dict_info = { + "uuid": "06f6e91c-3236-11e8-9953-005056865aac", + "agent address": "1.2.3.4", + "agent name": "test-agent.nodomain.com", + "cambria.partition": "test-agent.nodomain.com", + "community": "", + "community len": 0, + "epoch_serno": 15222068260000, + "protocol version": "v2c", + "time received": 1522206826.2938566, + "trap category": "ONAP-COLLECTOR-SNMPTRAP", + "sysUptime": "218567736", + "notify OID": "1.3.6.1.4.1.9999.9.9.999", + "notify OID len": 10, + } + + snmptrap_dir = "/tmp/opt/app/snmptrap" + try: + Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) + except Exception as e: + print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + sys.exit(1) + + # create copy of snmptrapd.json for pytest + test_snmptrapd.pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" + with open(test_snmptrapd.pytest_json_config, "w") as outfile: + outfile.write(test_snmptrapd.pytest_json_data) + + test_snmptrapd.pytest_empty_config = "/tmp/opt/app/snmptrap/etc/empty.json" + with open(test_snmptrapd.pytest_empty_config, "w") as outfile: + outfile.write(test_snmptrapd.pytest_empty_data) - # create copy of snmptrapd.json for pytest - pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" - with open(pytest_json_config, "w") as outfile: - outfile.write(pytest_json_data) def test_usage_err(self): """ Test usage error """ - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - result = snmptrapd.usage_err() - assert pytest_wrapped_sys_exit.type == SystemExit - assert pytest_wrapped_sys_exit.value.code == 1 + with self.assertRaises(SystemExit) as exc: + snmptrapd.usage_err() + self.assertEqual(str(exc.exception), "1") + def test_load_all_configs(self): """ Test load of all configs """ + # request load of CBS data + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) - # init vars - tds.init() - sw.sw_init() + result = trapd_get_cbs_config.get_cbs_config() + self.assertEqual(result, True) - # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - self.assertEqual(result, True) + # request load of CBS data + self.assertEqual(snmptrapd.load_all_configs(0, 1), True) + + + def test_resolve_ip(self): + """ Test resolve_ip """ + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + time_base = 1000000 + time_offset = 10000 + with patch('time.time', return_value=time_base): + self.assertEqual(time.time(), time_base) + + fqdn = "foo.example" + ip = "1.2.3.4" + with patch.dict(tds.dns_cache_ip_to_name): + tds.dns_cache_ip_to_name = { } + # DOUBLE EXCEPTION - nothing in tds.dns_cache_ip_expires + # and gethostbyaddr() fails + with patch('socket.gethostbyaddr', side_effect=RuntimeError("gethostbyaddr raises")): + self.assertEqual(snmptrapd.resolve_ip(ip), ip) + self.assertEqual(tds.dns_cache_ip_to_name[ip], ip) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + tds.dns_cache_ip_to_name = { ip: fqdn } + with patch('socket.gethostbyaddr', return_value=(fqdn,fqdn,[ip])): + # EXCEPTION - nothing in tds.dns_cache_ip_expires + del tds.dns_cache_ip_expires[ip] + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() - 10000}): + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + + with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() + 10000}): + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + time_offset) - # request load of CBS data - result = snmptrapd.load_all_configs(0, 1) - self.assertEqual(result, True) def test_load_all_configs_signal(self): """ @@ -81,78 +254,101 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - self.assertEqual(result, True) + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + self.assertTrue(trapd_get_cbs_config.get_cbs_config()) + + # request load of CBS data + self.assertTrue(snmptrapd.load_all_configs(1, 1)) + + with patch('snmptrapd.get_cbs_config', return_value=False): + with self.assertRaises(SystemExit): + snmptrapd.load_all_configs(1, 1) - # request load of CBS data - result = snmptrapd.load_all_configs(1, 1) - self.assertEqual(result, True) def test_log_all_arriving_traps(self): """ Test logging of traps """ - # init vars tds.init() - # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - - # set last day to current - tds.last_day = datetime.datetime.now().day - - # trap dict for logging - tds.trap_dict = { - "uuid": "06f6e91c-3236-11e8-9953-005056865aac", - "agent address": "1.2.3.4", - "agent name": "test-agent.nodomain.com", - "cambria.partition": "test-agent.nodomain.com", - "community": "", - "community len": 0, - "epoch_serno": 15222068260000, - "protocol version": "v2c", - "time received": 1522206826.2938566, - "trap category": "ONAP-COLLECTOR-SNMPTRAP", - "sysUptime": "218567736", - "notify OID": "1.3.6.1.4.1.9999.9.9.999", - "notify OID len": 10, - } + # don't open files, but try to log - should raise exception + with self.assertRaises(Exception) as exc: + snmptrapd.log_all_arriving_traps() + self.assertIsInstance(exc.exception, TypeError) - # open eelf logs - trapd_io.open_eelf_logs() - - # open trap logs - tds.arriving_traps_filename = ( - tds.c_config["files"]["runtime_base_dir"] - + "/" - + tds.c_config["files"]["log_dir"] - + "/" - + (tds.c_config["files"]["arriving_traps_log"]) - ) - tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename) - - # name and open json trap log - tds.json_traps_filename = ( - tds.c_config["files"]["runtime_base_dir"] - + "/" - + tds.c_config["files"]["log_dir"] - + "/" - + "DMAAP_" - + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) - + ".json" - ) - tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename) - msg = "published traps logged to: %s" % tds.json_traps_filename - trapd_io.stdout_logger(msg) - trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + # request load of CBS data + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + # trap dict for logging + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + result = trapd_get_cbs_config.get_cbs_config() + + # set last day to current + tds.last_day = datetime.datetime.now().day + + + # open eelf logs + trapd_io.open_eelf_logs() + + # open trap logs + tds.arriving_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + (tds.c_config["files"]["arriving_traps_log"]) + ) + tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename) + + # name and open json trap log + tds.json_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + "DMAAP_" + + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) + + ".json" + ) + tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename) + msg = "published traps logged to: %s" % tds.json_traps_filename + trapd_io.stdout_logger(msg) + trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + # also force it to daily roll + snmptrapd.log_all_arriving_traps() + + # try again, but with day rolling + tds.last_day = datetime.datetime.now().day - 1 + snmptrapd.log_all_arriving_traps() + + # try again, with roll_frequency set to minute + tds.last_minute = datetime.datetime.now().minute - 1 + tds.c_config["files"]["roll_frequency"] = "minute" + snmptrapd.log_all_arriving_traps() + + # try again, with roll_frequency set to hour + tds.last_hour = datetime.datetime.now().hour - 1 + tds.c_config["files"]["roll_frequency"] = "hour" + snmptrapd.log_all_arriving_traps() + + # try again, with a bad trap_dict[time_received] + tds.trap_dict["time received"] = "bad_value" + snmptrapd.log_all_arriving_traps() + + # also test log_published_messages() + snmptrapd.log_published_messages("data #1") + + # work even if there is an exception + # this SHOULD be done with a context + sv_json_traps_fd = tds.json_traps_fd + tds.json_traps_fd = test_snmptrapd.WriteThrows() + snmptrapd.log_published_messages("data #2") + tds.json_traps_fd = sv_json_traps_fd - # don't open files, but try to log - should raise exception - with pytest.raises(Exception) as pytest_wrapped_exception: - result = snmptrapd.log_all_arriving_traps() - assert pytest_wrapped_exception.type == AttributeError def test_log_all_incorrect_log_type(self): """ @@ -163,11 +359,14 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - trapd_get_cbs_config.get_cbs_config() + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + trapd_get_cbs_config.get_cbs_config() + + # open eelf logs + trapd_io.open_eelf_logs() - # open eelf logs - trapd_io.open_eelf_logs() def test_v1_trap_receipt(self): """ @@ -178,26 +377,227 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - trapd_get_cbs_config.get_cbs_config() - - errorIndication, errorStatus, errorIndex, varbinds = next( - sendNotification( - SnmpEngine(), - CommunityData("not_public"), - UdpTransportTarget(("localhost", 6162)), - ContextData(), - "trap", - [ - ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), - ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), - ], + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + trapd_get_cbs_config.get_cbs_config() + + errorIndication, errorStatus, errorIndex, varbinds = next( + sendNotification( + SnmpEngine(), + CommunityData("not_public"), + UdpTransportTarget(("localhost", 6162)), + ContextData(), + "trap", + [ + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), + ], + ) ) - ) - result = errorIndication - self.assertEqual(result, None) + result = errorIndication + self.assertEqual(result, None) + + + def test_post_dmaap(self): + """ + Test post_dmaap() + """ + + # trap dict for logging + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + with patch('snmptrapd.ecomp_logger') as magic_ecomp_logger: + with patch('requests.Session.post') as magic_session_post: + fake_post_resp = Mock() + magic_session_post.return_value = fake_post_resp + + fake_post_resp.status_code = requests.codes.moved_permanently + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 11) + + fake_post_resp.status_code = requests.codes.ok + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 16) + + magic_ecomp_logger.call_count = 0 + tds.traps_since_last_publish = 1 + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 5) + + magic_ecomp_logger.call_count = 0 + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] = "aaf_username" + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_password"] = "aaf_password" + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 5) + + # for some reason this exception is being seen as an OSError ???? + magic_ecomp_logger.call_count = 0 + magic_session_post.side_effect = requests.exceptions.RequestException("test throw") + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 10) + + magic_ecomp_logger.call_count = 0 + magic_session_post.side_effect = OSError() + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 10) + + + @unittest.skip("do not know what to pass in for vars. Need an object with a clone() method") + def test_comm_string_rewrite_observer(self): + """ + test comm_string_rewrite_observer() + """ + vars = { "communityName": ["name"] } + snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx") + assertEqual(vars["communitName"], "public") + + vars = { "communityName": [] } + snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx") + assertEqual(vars["communitName"], "") + + + def test_snmp_engine_observer_cb(self): + """ + test snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): + """ + + snmp_engine = "snmp_engine" + execpoint = "execpoint" + cbCtx = "cbCtx" + variables = { + 'transportDomain': [ 1, 2, 3 ], + 'transportAddress': [ "a", "b", "c" ] + } + for secmodel,ret in [ (1, "v1"), (2, "v2c"), (3, "v3"), (4, "unknown") ]: + variables["securityModel"] = secmodel + snmptrapd.snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx) + self.assertEqual(tds.trap_dict["protocol version"], ret) -if __name__ == "__main__": + def test_add_varbind_to_log_string(self): + """ + test add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val) + """ + vb_oid = "vb_oid" + vb_type = "vb_type" + + class TempPP(): + def prettyPrint(self): + return "pp ret" + + vb_val = TempPP() + + self.assertEqual(tds.all_vb_str, "") + + snmptrapd.add_varbind_to_log_string(0, vb_oid, vb_type, vb_val) + self.assertEqual(tds.all_vb_str, + 'varbinds: [0] vb_oid {vb_type} pp ret') + + snmptrapd.add_varbind_to_log_string(1, vb_oid, vb_type, vb_val) + self.assertEqual(tds.all_vb_str, + 'varbinds: [0] vb_oid {vb_type} pp ret [1] vb_oid {vb_type} pp ret') + + + def test_add_varbind_to_json(self): + """ + test add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val) + """ + + class TempPP(): + def __init__(self, ret): + self.ret = ret + def prettyPrint(self): + return self.ret + + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + vb_oid = TempPP("1.2.3") + override_vb_oid = TempPP("1.3.6.1.6.3.18.1.3.0") + + vb_type = "vb_type" + + vb_val = TempPP("foo.example") + + self.assertEqual(snmptrapd.add_varbind_to_json(0, vb_oid, vb_type, vb_val), 0) + self.assertEqual(snmptrapd.add_varbind_to_json(1, vb_oid, vb_type, vb_val), 0) + self.assertEqual(tds.trap_dict["notify OID"], ".foo.example") + self.assertEqual(tds.trap_dict["notify OID len"], 2) + + with patch('snmptrapd.resolve_ip') as magic_resolve_ip: + magic_resolve_ip.return_value = 'foo.example' + self.assertEqual(snmptrapd.add_varbind_to_json(2, override_vb_oid, vb_type, vb_val), 0) + self.assertEqual(tds.trap_dict["agent address"], "foo.example") + self.assertEqual(tds.trap_dict["agent name"], "foo.example") + + sv_protocol_version = tds.trap_dict["protocol version"] + tds.trap_dict["protocol version"] = "v1" + self.assertEqual(snmptrapd.add_varbind_to_json(4, vb_oid, vb_type, vb_val), 0) + self.assertEqual(snmptrapd.add_varbind_to_json(5, vb_oid, vb_type, vb_val), 1) + + tds.trap_dict["protocol version"] = sv_protocol_version + self.assertEqual(snmptrapd.add_varbind_to_json(6, vb_oid, vb_type, vb_val), 1) + self.assertEqual(tds.all_vb_json_str, + ', "varbinds": [{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"}') + + self.assertEqual(snmptrapd.add_varbind_to_json(7, vb_oid, vb_type, vb_val), 1) + self.assertEqual(tds.all_vb_json_str, + ', "varbinds": [{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"}') + + + @patch('snmptrapd.log_all_arriving_traps') + @patch('snmptrapd.post_dmaap', return_value = 0) + @patch('snmptrapd.ecomp_logger', return_value = 0) + @patch('snmptrapd.add_varbind_to_json', return_value = 1) + @patch('snmptrapd.add_varbind_to_log_string', return_value = 0) + def test_notif_receiver_cb(self, magic_add_varbind_to_log_string, magic_add_varbind_to_json, + magic_ecomp_logger, magic_port_dmaap, magic_lost_all_arriving_traps): + """ notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, varBinds, cbCtx) """ + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + with patch('trapd_stormwatch.sw_storm_active', return_value=True): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertFalse(tds.first_trap) + self.assertEqual(magic_ecomp_logger.call_count, 8) + + magic_ecomp_logger.call_count = 0 + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.c_config["publisher"]["max_traps_between_publishes"] = 1 + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.c_config["publisher"]["max_traps_between_publishes"] = 100 + tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 0 + tds.last_pub_time = 0 + with patch('time.time', return_value=0): + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.last_pub_time = 100000 + tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 1 + with patch('time.time', return_value=10): + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py index 0e38461..371b308 100644 --- a/tests/test_trapd_exit.py +++ b/tests/test_trapd_exit.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit @@ -29,27 +28,26 @@ class test_cleanup_and_exit(unittest.TestCase): def test_normal_exit(self): """ - Test normal exit works as expected + Test normal exit works as expected, and exits with the 1st arg """ - open(pid_file, "w") + # create an empty pid file + with open(pid_file, "w"): + pass - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: + with self.assertRaises(SystemExit) as exc: result = trapd_exit.cleanup_and_exit(0, pid_file) - assert pytest_wrapped_sys_exit.type == SystemExit - assert pytest_wrapped_sys_exit.value.code == 0 + self.assertEqual(str(exc.exception), "0") - # compare = str(result).startswith("SystemExit: 0") - # self.assertEqual(compare, True) def test_abnormal_exit(self): """ - Test exit with missing PID file exits non-zero + Test exit with missing PID file. Still exits with the 1st arg. """ - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: + + with self.assertRaises(SystemExit) as exc: result = trapd_exit.cleanup_and_exit(0, pid_file_dne) - assert pytest_wrapped_sys_exit.type == SystemExit - assert pytest_wrapped_sys_exit.value.code == 1 + self.assertEqual(str(exc.exception), "0") -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py index b7c5d7b..4064b78 100644 --- a/tests/test_trapd_get_cbs_config.py +++ b/tests/test_trapd_get_cbs_config.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,101 +14,198 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest -import unittest +import json import os import sys +import unittest +from unittest.mock import patch +from pathlib import Path -from onap_dcae_cbs_docker_client.client import get_config -from trapd_exit import cleanup_and_exit -from trapd_io import stdout_logger, ecomp_logger -import trapd_settings as tds import trapd_get_cbs_config -from pathlib import Path -# # # # # # # # -# ENV setup -# # # # # # # # - -# required directory tree -try: - Path("/tmp/opt/app/snmptrap/logs").mkdir(parents=True, exist_ok=True) - Path("/tmp/opt/app/snmptrap/tmp").mkdir(parents=True, exist_ok=True) -except Exception as e: - print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) - sys.exit(1) - -# env var for CBS_SIM_JSON -try: - os.environ["CBS_SIM_JSON"] = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" -except Exception as e: - print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) - sys.exit(1) - -pytest_json_data = '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' - -# create snmptrapd.json for pytest -pytest_json_config = os.getenv("CBS_SIM_JSON") -with open(pytest_json_config, "w") as outfile: - outfile.write(pytest_json_data) -outfile.close() - -# test class/methods -class test_get_cbs_config(unittest.TestCase): +class test_trapd_get_cbs_config(unittest.TestCase): """ Test the trapd_get_cbs_config mod """ + snmptrap_dir = "/tmp/opt/app/snmptrap" + json_dir = snmptrap_dir + "/etc" + + # fmt: off + pytest_json_data = json.loads( + '{' + '"snmptrapd": { ' + ' "version": "1.4.0", ' + ' "title": "ONAP SNMP Trap Receiver" }, ' + '"protocols": { ' + ' "transport": "udp", ' + ' "ipv4_interface": "0.0.0.0", ' + ' "ipv4_port": 6162, ' + ' "ipv6_interface": "::1", ' + ' "ipv6_port": 6162 }, ' + '"cache": { ' + ' "dns_cache_ttl_seconds": 60 }, ' + '"publisher": { ' + ' "http_timeout_milliseconds": 1500, ' + ' "http_retries": 3, ' + ' "http_milliseconds_between_retries": 750, ' + ' "http_primary_publisher": "true", ' + ' "http_peer_publisher": "unavailable", ' + ' "max_traps_between_publishes": 10, ' + ' "max_milliseconds_between_publishes": 10000 }, ' + '"streams_publishes": { ' + ' "sec_fault_unsecure": { ' + ' "type": "message_router", ' + ' "aaf_password": null, ' + ' "dmaap_info": { ' + ' "location": "mtl5", ' + ' "client_id": null, ' + ' "client_role": null, ' + ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' + ' "aaf_username": null } }, ' + '"files": { ' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' + ' "log_dir": "logs", ' + ' "data_dir": "data", ' + ' "pid_dir": "tmp", ' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' + ' "traps_stats_log": "snmptrapd_stats.csv", ' + ' "perm_status_file": "snmptrapd_status.log", ' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' + ' "eelf_error": "error.log", ' + ' "eelf_debug": "debug.log", ' + ' "eelf_audit": "audit.log", ' + ' "eelf_metrics": "metrics.log", ' + ' "roll_frequency": "day", ' + ' "minimum_severity_to_log": 2 }, ' + '"trap_config": { ' + ' "sw_interval_in_seconds": 60, ' + ' "notify_oids": { ' + ' ".1.3.6.1.4.1.9.0.1": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.2": { ' + ' "sw_high_water_in_interval": 101, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.3": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.4": { ' + ' "sw_high_water_in_interval": 10, ' + ' "sw_low_water_in_interval": 3, ' + ' "category": "logonly" } } }, ' + '"snmpv3_config": { ' + ' "usm_users": [ { ' + ' "user": "usr-sha-aes256", ' + ' "engineId": "8000000001020304", ' + ' "usmHMACSHAAuth": "authkey1", ' + ' "usmAesCfb256": "privkey1" }, ' + ' { "user": "user1", ' + ' "engineId": "8000000000000001", ' + ' "usmHMACMD5Auth": "authkey1", ' + ' "usmDESPriv": "privkey1" }, ' + ' { "user": "user2", ' + ' "engineId": "8000000000000002", ' + ' "usmHMACSHAAuth": "authkey2", ' + ' "usmAesCfb128": "privkey2" }, ' + ' { "user": "user3", ' + ' "engineId": "8000000000000003", ' + ' "usmHMACSHAAuth": "authkey3", ' + ' "usmAesCfb256": "privkey3" } ' + '] } }' + ) + # fmt: on + + + @classmethod + def setUpClass(cls): + """ set up the required directory tree """ + try: + Path(test_trapd_get_cbs_config.snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) + Path(test_trapd_get_cbs_config.snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) + Path(test_trapd_get_cbs_config.snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) + except Exception as e: + print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + sys.exit(1) + + + def write_config(self, filename, config): + """ + write a config file + """ + # create snmptrapd.json for pytest + with open(filename, "w") as outfile: + json.dump(config, outfile) + + + @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/snmptrapd.json"}) def test_cbs_fallback_env_present(self): """ Test that CBS fallback env variable exists and we can get config from fallback env var """ - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, True) - self.assertEqual(result, True) + assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/snmptrapd.json" + self.write_config(test_trapd_get_cbs_config.json_dir + "/snmptrapd.json", test_trapd_get_cbs_config.pytest_json_data) + + self.assertTrue(trapd_get_cbs_config.get_cbs_config()) + + @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/snmptrapd.json"}) + def test_cbs_fallback_env_present_bad_numbers(self): + """ + Test as in test_cbs_fallback_env_present(), but with + various values reset to be non-numeric. + """ + assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/snmptrapd.json" + with patch.dict(test_trapd_get_cbs_config.pytest_json_data): + test_trapd_get_cbs_config.pytest_json_data["publisher"]["http_milliseconds_between_retries"] = "notanumber" + test_trapd_get_cbs_config.pytest_json_data["files"]["minimum_severity_to_log"] = "notanumber" + test_trapd_get_cbs_config.pytest_json_data["publisher"]["http_retries"] = "notanumber" + self.write_config(test_trapd_get_cbs_config.json_dir + "/snmptrapd.json", + test_trapd_get_cbs_config.pytest_json_data) + + self.assertTrue(trapd_get_cbs_config.get_cbs_config()) + + + @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/nosuchfile.json"}) def test_cbs_override_env_invalid(self): """ """ - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/nosuchfile.json") - # result = trapd_get_cbs_config.get_cbs_config() - # print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, False) + assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/nosuchfile.json" - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: + with self.assertRaises(SystemExit) as exc: result = trapd_get_cbs_config.get_cbs_config() - assert pytest_wrapped_sys_exit.type == SystemExit - # assert pytest_wrapped_sys_exit.value.code == 1 + self.assertEqual(str(exc.exception), "1") + + @patch.dict(os.environ, {"CONSUL_HOST": "localhost"}) def test_cbs_env_present(self): """ Test that CONSUL_HOST env variable exists but fails to respond """ - os.environ.update(CONSUL_HOST="localhost") + self.assertEqual(os.getenv("CONSUL_HOST"), "localhost") + del os.environ["CBS_SIM_JSON"] - # result = trapd_get_cbs_config.get_cbs_config() - # print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, False) + self.assertNotIn("CBS_SIM_JSON", os.environ) - with pytest.raises(SystemExit) as sys_exit: + with self.assertRaises(SystemExit) as exc: trapd_get_cbs_config.get_cbs_config() - assert sys_exit.value.errno == errno.ECONNREFUSED + + @patch.dict(os.environ, {}) def test_cbs_override_env_undefined(self): """ """ - print("------>>> RUNNING test_no_cbs_override_env_var:") del os.environ["CBS_SIM_JSON"] + self.assertNotIn("CBS_SIM_JSON", os.environ) - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - assert trapd_get_cbs_config.get_cbs_config() == SystemExit + with self.assertRaises(SystemExit) as exc: + trapd_get_cbs_config.get_cbs_config() -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py index 7ec89a6..667a454 100644 --- a/tests/test_trapd_http_session.py +++ b/tests/test_trapd_http_session.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,9 +14,11 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_http_session +import requests +from unittest.mock import Mock, patch +import trapd_settings as tds class test_init_session_obj(unittest.TestCase): @@ -24,39 +26,62 @@ class test_init_session_obj(unittest.TestCase): Test the init_session_obj mod """ - def close_nonexisting_session(self): + @classmethod + def setUpClass(cls): + tds.init() + + def test_init_session_obj(self): """ - test close of existing http session + test creation of http session """ - sess = "no session" - result = trapd_http_session.close_session_obj(sess) - self.assertEqual(result, True) + self.assertIsInstance(trapd_http_session.init_session_obj(), requests.sessions.Session) + - def init_session(self): + def test_init_session_obj_raises(self): """ - test creation of http session + test close when the requests.Session() method throws an exception + """ + with patch('requests.Session') as magic_requests: + magic_requests.side_effect = RuntimeError("Session() throws via mock") + with self.assertRaises(SystemExit) as exc: + trapd_http_session.init_session_obj() + self.assertEqual(str(exc.exception), "1") + + + def test_close_nonexisting_session(self): """ - result = trapd_http_session.init_session_obj() - compare = str(result).startswith("<requests.sessions.Session object at") - self.assertEqual(compare, True) + test close of non-existing http session + """ + self.assertIsNone(trapd_http_session.close_session_obj(None)) + + + def test_close_nonexisting_close_raises(self): + """ + test close when the session.close() method throws + """ + class CloseThrows(): + def close(self): + raise RuntimeError("close() throws") + + with self.assertRaises(SystemExit): + trapd_http_session.close_session_obj(CloseThrows()) + def test_reset(self): """ test reset of existing http session """ sess = trapd_http_session.init_session_obj() - result = trapd_http_session.reset_session_obj(sess) - compare = str(result).startswith("<requests.sessions.Session object at") - self.assertEqual(compare, True) + self.assertIsInstance(trapd_http_session.reset_session_obj(sess), requests.sessions.Session) + - def close_existing_session(self): + def test_close_existing_session(self): """ test close of existing http session """ sess = trapd_http_session.init_session_obj() - result = trapd_http_session.close_session_obj(sess) - self.assertEqual(result, True) + self.assertTrue(trapd_http_session.close_session_obj(sess)) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_io.py b/tests/test_trapd_io.py index c1702aa..f3d1a4c 100644 --- a/tests/test_trapd_io.py +++ b/tests/test_trapd_io.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,17 +14,21 @@ # limitations under the License. # ============LICENSE_END========================================================= +import datetime +import glob +import io +import json import os -import pytest +from pathlib import Path +import sys +import tempfile import unittest +from unittest.mock import patch + import snmptrapd -import datetime -import json import trapd_settings as tds import trapd_runtime_pid import trapd_io -import sys -from pathlib import Path class test_trapd_io(unittest.TestCase): @@ -32,32 +36,141 @@ class test_trapd_io(unittest.TestCase): Test the save_pid mod """ - tds.c_config = json.loads( - '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' - ) + class PseudoFile(): + """ test file-like object that does nothing """ + def write(self): + pass + def close(self): + pass - def test_roll_all_files_notopen(self): - """ - Test rolling files that aren't open - """ + class WriteThrows(): + """ test file-like object that throws on a write """ + def write(self): + raise RuntimeError("close() throws") + + + @classmethod + def setUpClass(cls): + + tds.init() + + snmptrap_dir = "/tmp/opt/app/snmptrap" + try: + Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) + except Exception as e: + print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + sys.exit(1) + + # fmt: off + tds.c_config = json.loads( + '{ "snmptrapd": { ' + ' "version": "1.4.0", ' + ' "title": "ONAP SNMP Trap Receiver" }, ' + '"protocols": { ' + ' "transport": "udp", ' + ' "ipv4_interface": "0.0.0.0", ' + ' "ipv4_port": 6162, ' + ' "ipv6_interface": "::1", ' + ' "ipv6_port": 6162 }, ' + '"cache": { ' + ' "dns_cache_ttl_seconds": 60 }, ' + '"publisher": { ' + ' "http_timeout_milliseconds": 1500, ' + ' "http_retries": 3, ' + ' "http_milliseconds_between_retries": 750, ' + ' "http_primary_publisher": "true", ' + ' "http_peer_publisher": "unavailable", ' + ' "max_traps_between_publishes": 10, ' + ' "max_milliseconds_between_publishes": 10000 }, ' + '"streams_publishes": { ' + ' "sec_fault_unsecure": { ' + ' "type": "message_router", ' + ' "aaf_password": null, ' + ' "dmaap_info": { ' + ' "location": "mtl5", ' + ' "client_id": null, ' + ' "client_role": null, ' + ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' + ' "aaf_username": null } }, ' + '"files": { ' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' + ' "log_dir": "logs", ' + ' "data_dir": "data", ' + ' "pid_dir": "tmp", ' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' + ' "traps_stats_log": "snmptrapd_stats.csv", ' + ' "perm_status_file": "snmptrapd_status.log", ' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' + ' "eelf_error": "error.log", ' + ' "eelf_debug": "debug.log", ' + ' "eelf_audit": "audit.log", ' + ' "eelf_metrics": "metrics.log", ' + ' "roll_frequency": "day", ' + ' "minimum_severity_to_log": 2 }, ' + '"trap_config": { ' + ' "sw_interval_in_seconds": 60, ' + ' "notify_oids": { ' + ' ".1.3.6.1.4.1.9.0.1": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.2": { ' + ' "sw_high_water_in_interval": 101, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.3": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.4": { ' + ' "sw_high_water_in_interval": 10, ' + ' "sw_low_water_in_interval": 3, ' + ' "category": "logonly" } } }, ' + '"snmpv3_config": { ' + ' "usm_users": [ { ' + ' "user": "usr-sha-aes256", ' + ' "engineId": "8000000001020304", ' + ' "usmHMACSHAAuth": "authkey1", ' + ' "usmAesCfb256": "privkey1" }, ' + ' { "user": "user1", ' + ' "engineId": "8000000000000001", ' + ' "usmHMACMD5Auth": "authkey1", ' + ' "usmDESPriv": "privkey1" }, ' + ' { "user": "user2", ' + ' "engineId": "8000000000000002", ' + ' "usmHMACSHAAuth": "authkey2", ' + ' "usmAesCfb128": "privkey2" }, ' + ' { "user": "user3", ' + ' "engineId": "8000000000000003", ' + ' "usmHMACSHAAuth": "authkey3", ' + ' "usmAesCfb256": "privkey3" } ' + '] } }' + ) + # fmt: on + tds.json_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + "/json_traps.json" + ) + tds.arriving_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + "/arriving_traps.log" + ) - # try to roll logs when not open - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.roll_all_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_error_file(self): """ Test bad error file location """ - # open eelf error logs - tds.c_config["files.eelf_error"] = "/bad_dir/error.log" + with patch.dict(tds.c_config["files"]): + # open eelf error logs + tds.c_config["files"]["eelf_error"] = "/bad_dir/error.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_debug_file(self): """ @@ -65,86 +178,248 @@ class test_trapd_io(unittest.TestCase): """ # open eelf debug logs - tds.c_config["files.eelf_debug"] = "/bad_dir/debug.log" + with patch.dict(tds.c_config["files"]): + tds.c_config["files"]["eelf_debug"] = "/bad_dir/debug.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_audit_file(self): """ Test bad audit file location """ - # open eelf debug logs - tds.c_config["files.eelf_audit"] = "/bad_dir/audit.log" + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + tds.c_config["files"]["eelf_audit"] = "/bad_dir/audit.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_metrics_file(self): """ Test bad metrics file location """ + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + tds.c_config["files"]["eelf_metrics"] = "/bad_dir/metrics.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_error_file_missing_name(self): + """ + Test bad error file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf error logs + del tds.c_config["files"]["eelf_error"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_debug_file_missing_name(self): + """ + Test bad debug file location + """ + # open eelf debug logs - tds.c_config["files.eelf_metrics"] = "/bad_dir/metrics.log" + with patch.dict(tds.c_config["files"]): + del tds.c_config["files"]["eelf_debug"] - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - def test_roll_all_logs(self): + + def test_open_eelf_audit_file_missing_name(self): + """ + Test bad audit file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + del tds.c_config["files"]["eelf_audit"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_metrics_file_missing_name(self): + """ + Test bad metrics file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + del tds.c_config["files"]["eelf_metrics"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_roll_all_logs_not_open(self): """ Test roll of logs when not open """ - # try to roll logs when not open - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.roll_all_logs() - assert pytest_wrapped_exception.type == SystemExit + # try to roll logs when not open. Shouldn't fail + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + def test_roll_all_logs(self): + """ + Test rolling files that they are open + """ + + trapd_io.open_eelf_logs() + # try to roll logs + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_roll_file_throws(self): + """ + Test rolling files that they are open + but roll_file throws an exception + """ + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.roll_file') as roll_file_throws: + roll_file_throws.side_effect = RuntimeError("roll_file() throws") + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_eelf_logs_returns_false(self): + """ + Test rolling files that they are open + but open_eelf_logs returns false + """ + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_eelf_logs') as open_eelf_logs_throws: + open_eelf_logs_throws.return_value = False + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_file_json_traps_throws(self): + """ + Test rolling files that they are open + but open_file(json_traps_filename) throws an exception + """ + + def tmp_func(nm): + if nm == tds.json_traps_filename: + raise RuntimeError("json_traps_filename throws") + return test_trapd_io.PseudoFile() + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_file') as open_file_throws: + open_file_throws.side_effect = tmp_func + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_file_arriving_traps_throws(self): + """ + Test rolling files that they are open + but open_file(arriving_traps_filename) throws an exception + """ + + def tmp_func(nm): + if nm == tds.arriving_traps_filename: + raise RuntimeError("arriving_traps_filename throws") + return test_trapd_io.PseudoFile() + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_file') as open_file_throws: + open_file_throws.side_effect = tmp_func + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + def test_roll_file(self): """ Test roll of individual file when not present """ + # try to roll a valid log file + with tempfile.TemporaryDirectory() as ntd: + fn = ntd + "/test.log" + with open(fn, "w") as ofp: + self.assertTrue(trapd_io.roll_file(fn)) + # The file will be renamed to something like + # test.log.2022-08-17T20:28:32 + self.assertFalse(os.path.exists(fn)) + # We could also add a test to see if there is a file + # with a name like that. + files = list(glob.glob(f"{ntd}/*")) + print(f"files={files}") + self.assertEqual(len(files), 1) + self.assertTrue(files[0].startswith(fn + ".")) + + + def test_roll_file_not_present(self): + """ + Test roll of individual file when not present + """ + # try to roll logs when not open - result = trapd_io.roll_file("/file/not/present") - self.assertEqual(result, False) + self.assertFalse(trapd_io.roll_file("/file/not/present")) + def test_roll_file_no_write_perms(self): """ try to roll logs when not enough perms """ - no_perms_dir = "/tmp/opt/app/snmptrap/no_perms" - no_perms_file = "test.dat" - no_perms_fp = no_perms_dir + "/" + no_perms_file + with tempfile.TemporaryDirectory() as no_perms_dir: + # no_perms_dir = "/tmp/opt/app/snmptrap/no_perms" + no_perms_file = "test.dat" + no_perms_fp = no_perms_dir + "/" + no_perms_file - # required directory tree - try: - Path(no_perms_dir).mkdir(parents=True, exist_ok=True) - os.chmod(no_perms_dir, 0o777) - except Exception as e: - print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) - sys.exit(1) + # required directory tree + #try: + # Path(no_perms_dir).mkdir(parents=True, exist_ok=True) + # os.chmod(no_perms_dir, 0o700) + #except Exception as e: + # self.fail("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + + # create empty file + open(no_perms_fp, "w").close() + os.chmod(no_perms_dir, 0o555) - # create empty file - open(no_perms_fp, "a").close() - os.chmod(no_perms_dir, 0o444) + # try to roll file in dir with no write perms + self.assertFalse(trapd_io.roll_file(no_perms_fp)) - result = trapd_io.roll_file(no_perms_fp) - self.assertEqual(result, False) + # the file should still be there + open(no_perms_fp).close() + + # allow the directory to be removed + os.chmod(no_perms_dir, 0o700) - # try to roll file in dir with no write perms - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_file(no_perms_fp) - assert pytest_wrapped_exception.type == SystemExit def test_open_file_exists(self): """ @@ -156,8 +431,9 @@ class test_trapd_io(unittest.TestCase): # try to roll logs when not open result = trapd_io.open_file(test_file) - compare = str(result).startswith("<_io.TextIOWrapper name=") - self.assertEqual(compare, True) + self.assertTrue(str(result).startswith("<_io.TextIOWrapper name=")) + self.assertIsInstance(result, io.TextIOWrapper) + def test_open_file_exists_does_not_exist(self): """ @@ -168,9 +444,9 @@ class test_trapd_io(unittest.TestCase): test_file = "/tmp/no_such_dir/snmptrap_pytest" # try to open file when dir not present - with pytest.raises(SystemExit) as pytest_wrapped_exception: + with self.assertRaises(SystemExit): result = trapd_io.open_file(test_file) - assert pytest_wrapped_exception.type == SystemExit + def test_close_file_exists(self): """ @@ -182,18 +458,113 @@ class test_trapd_io(unittest.TestCase): test_file = trapd_io.open_file(test_file_name) # close active file - result = trapd_io.close_file(test_file, test_file_name) - self.assertEqual(result, True) + self.assertTrue(trapd_io.close_file(test_file, test_file_name)) + - def test_close_file_does_not_exists(self): + def test_close_file_does_not_exist(self): """ Test closing non-existent file """ # try to roll logs when not open - result = trapd_io.close_file(None, None) - self.assertEqual(result, False) + self.assertFalse(trapd_io.close_file(None, None)) + + + def test_ecomp_logger_type_error(self): + """ + test trapd_io.ecomp_logger + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_error_bad_fd(self): + """ + test trapd_io.ecomp_logger, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_error_fd = tds.eelf_error_fd + tds.eelf_error_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_error_fd = sv_eelf_error_fd + + + def test_ecomp_logger_type_unknown_bad_fd(self): + """ + test trapd_io.ecomp_logger, unknown type, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_error_fd = tds.eelf_error_fd + tds.eelf_error_fd = test_trapd_io.WriteThrows() + self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_error_fd = sv_eelf_error_fd + + + def test_ecomp_logger_type_metrics(self): + """ + test trapd_io.ecomp_logger to metrics + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_metrics_bad_fd(self): + """ + test trapd_io.ecomp_logger to metrics, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_metrics_fd = tds.eelf_metrics_fd + tds.eelf_metrics_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_metrics_fd = sv_eelf_metrics_fd + + + def test_ecomp_logger_type_audit(self): + """ + test trapd_io.ecomp_logger to audit log + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_audit_bad_fd(self): + """ + test trapd_io.ecomp_logger to audit log, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_audit_fd = tds.eelf_audit_fd + tds.eelf_audit_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_audit_fd = sv_eelf_audit_fd + + + def test_ecomp_logger_type_unknown(self): + """ + test trapd_io.ecomp_logger + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_runtime_pid.py b/tests/test_trapd_runtime_pid.py index c6b8601..923f730 100644 --- a/tests/test_trapd_runtime_pid.py +++ b/tests/test_trapd_runtime_pid.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,24 +14,30 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest +import os import unittest -import trapd_runtime_pid +from unittest.mock import patch + import trapd_io +import trapd_runtime_pid -class test_save_pid(unittest.TestCase): +class test_trapd_runtime_pid(unittest.TestCase): """ Test the save_pid mod """ + GOOD_PID_FILE = "/tmp/snmptrap_test_pid_file" + BAD_PID_FILE = "/tmp/snmptrap_test_pid_file_not_there" + def test_correct_usage(self): """ Test that attempt to create pid file in standard location works """ - result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file") + result = trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE) self.assertEqual(result, True) + def test_missing_directory(self): """ Test that attempt to create pid file in missing dir fails @@ -40,7 +46,6 @@ class test_save_pid(unittest.TestCase): self.assertEqual(result, False) -class test_rm_pid(unittest.TestCase): """ Test the rm_pid mod """ @@ -50,18 +55,26 @@ class test_rm_pid(unittest.TestCase): Test that attempt to remove pid file in standard location works """ # must create it before removing it - result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file") - self.assertEqual(result, True) - result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file") - self.assertEqual(result, True) + self.assertTrue(trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE)) + self.assertTrue(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.GOOD_PID_FILE)) + def test_missing_file(self): """ Test that attempt to rm non-existent pid file fails """ - result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file_9999") - self.assertEqual(result, False) + self.assertFalse(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.BAD_PID_FILE)) + + + def test_correct_usage_but_throws(self): + """ + Test that an exception while removing returns false + """ + self.assertTrue(trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE)) + with patch('os.remove') as mock_remove: + mock_remove.side_effect = RuntimeError("os.remove throws") + self.assertFalse(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.GOOD_PID_FILE)) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_settings.py b/tests/test_trapd_settings.py index 92a3144..5625d78 100644 --- a/tests/test_trapd_settings.py +++ b/tests/test_trapd_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit @@ -24,66 +23,43 @@ pid_file_dne = "/tmp/test_pid_file_NOT" import trapd_settings as tds -class test_cleanup_and_exit(unittest.TestCase): +class test_trapd_settings(unittest.TestCase): """ Test for presense of required vars """ + @classmethod + def setUpClass(cls): + tds.init() + + def test_nonexistent_dict(self): """ Test nosuch var """ - tds.init() - try: - tds.no_such_var - result = True - except: - result = False + self.assertFalse(hasattr(tds, 'no_such_var')) - self.assertEqual(result, False) def test_config_dict(self): """ Test config dict """ - tds.init() - try: - tds.c_config - result = True - except: - result = False + self.assertTrue(hasattr(tds, 'c_config')) - self.assertEqual(result, True) def test_dns_cache_ip_to_name(self): """ Test dns cache name dict """ + self.assertTrue(hasattr(tds, 'dns_cache_ip_to_name')) - tds.init() - try: - tds.dns_cache_ip_to_name - result = True - except: - result = False - - self.assertEqual(result, True) def test_dns_cache_ip_expires(self): """ Test dns cache ip expires dict """ - - tds.init() - try: - tds.dns_cache_ip_expires - result = True - except: - result = False - - self.assertEqual(result, True) + self.assertTrue(hasattr(tds, 'dns_cache_ip_expires')) -if __name__ == "__main__": - # tds.init() +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_snmpv3.py b/tests/test_trapd_snmpv3.py index eac6082..2011953 100644 --- a/tests/test_trapd_snmpv3.py +++ b/tests/test_trapd_snmpv3.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,10 +14,10 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import json import unittest import os +import traceback from onap_dcae_cbs_docker_client.client import get_config from trapd_exit import cleanup_and_exit @@ -33,48 +33,459 @@ class test_snmpv3_config(unittest.TestCase): Test snmpv3 module """ + JSON_START = ( + '{' + ' "snmptrapd": {' + ' "version": "2.0",' + ' "title": "ONAP SNMP Trap Receiver"' + ' },' + ' "protocols": {' + ' "transport": "udp",' + ' "ipv4_interface": "0.0.0.0",' + ' "ipv4_port": 6162,' + ' "ipv6_interface": "::1",' + ' "ipv6_port": 6162' + ' },' + ' "cache": {' + ' "dns_cache_ttl_seconds": 10800' + ' },' + ' "publisher": {' + ' "http_milliseconds_timeout": 500,' + ' "http_retries": 2,' + ' "http_milliseconds_between_retries": 250,' + ' "http_primary_publisher": "true",' + ' "http_peer_publisher": "unavailable",' + ' "max_traps_between_publishes": 10,' + ' "max_milliseconds_between_publishes": 10000' + ' },' + ' "streams_publishes": {' + ' "sec_fault_unsecure": {' + ' "type": "message_router",' + ' "aaf_password": null,' + ' "dmaap_info": {' + ' "location": "mtl5",' + ' "client_id": null,' + ' "client_role": null,' + ' "topic_url": "http://localhost:3904/events/unauthenticated.ONAP-COLLECTOR-SNMPTRAP"' + ' },' + ' "aaf_username": null' + ' }' + ' },' + ' "files": {' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap",' + ' "log_dir": "logs",' + ' "data_dir": "data",' + ' "pid_dir": "tmp",' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log",' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log",' + ' "traps_stats_log": "snmptrapd_stats.csv",' + ' "perm_status_file": "snmptrapd_status.log",' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs",' + ' "eelf_error": "error.log",' + ' "eelf_debug": "debug.log",' + ' "eelf_audit": "audit.log",' + ' "eelf_metrics": "metrics.log",' + ' "roll_frequency": "hour",' + ' "minimum_severity_to_log": 3' + ' },' + ' "check_hb_traps": {' + ' "trap_thr": 900,' + ' "hb_thr": 900,' + ' "hb_notify_oid": ".1.3.6.1.4.1.74.2.46.12.1.1"' + ' },' + ' "trap_config": {' + ' "sw_interval_in_seconds": 60,' + ' "metric_log_notification_threshold_pct": 25,' + ' "notify_oids": [' + ' {' + ' "oid": ".1.3.6.1.4.1.9.0.1",' + ' "sw_high_water_in_interval": 100,' + ' "sw_low_water_in_interval": 5,' + ' "category": "logonly"' + ' },' + ' {' + ' "oid": ".1.3.6.1.4.1.9.0.2",' + ' "sw_high_water_in_interval": 200,' + ' "sw_low_water_in_interval": 10,' + ' "category": "logonly"' + ' },' + ' {' + ' "oid": ".1.3.6.1.4.1.9.0.3",' + ' "sw_high_water_in_interval": 300,' + ' "sw_low_water_in_interval": 15,' + ' "category": "logonly"' + ' }' + ' ]' + ' }' + ) + JSON_USERS = ( + ' "snmpv3_config": {' + ' "usm_users": [' + ' {' + ' "user": "user1",' + ' "engineId": "8000000000000001",' + ' "usmHMACMD5AuthProtocol": "authkey1",' + ' "usmDESPrivProtocol": "privkey1"' + ' },' + ' {' + ' "user": "user2",' + ' "engineId": "8000000000000002",' + ' "usmHMACMD5AuthProtocol": "authkey2",' + ' "usm3DESEDEPrivProtocol": "privkey2"' + ' },' + ' {' + ' "user": "user3",' + ' "engineId": "8000000000000003",' + ' "usmHMACMD5AuthProtocol": "authkey3",' + ' "usmAesCfb128Protocol": "privkey3"' + ' },' + ' {' + ' "user": "user4",' + ' "engineId": "8000000000000004",' + ' "usmHMACMD5AuthProtocol": "authkey4",' + ' "usmAesBlumenthalCfb192Protocol": "privkey4"' + ' },' + ' {' + ' "user": "user5",' + ' "engineId": "8000000000000005",' + ' "usmHMACMD5AuthProtocol": "authkey5",' + ' "usmAesBlumenthalCfb256Protocol": "privkey5"' + ' },' + ' {' + ' "user": "user6",' + ' "engineId": "8000000000000006",' + ' "usmHMACMD5AuthProtocol": "authkey6",' + ' "usmAesCfb192Protocol": "privkey6"' + ' },' + ' {' + ' "user": "user7",' + ' "engineId": "8000000000000007",' + ' "usmHMACMD5AuthProtocol": "authkey7",' + ' "usmAesCfb256Protocol": "privkey7"' + ' },' + ' {' + ' "user": "user9",' + ' "engineId": "8000000000000009",' + ' "usmHMACSHAAuthProtocol": "authkey9",' + ' "usmDESPrivProtocol": "privkey9"' + ' },' + ' {' + ' "user": "user10",' + ' "engineId": "8000000000000010",' + ' "usmHMACSHAAuthProtocol": "authkey10",' + ' "usm3DESEDEPrivProtocol": "privkey10"' + ' },' + ' {' + ' "user": "user11",' + ' "engineId": "8000000000000011",' + ' "usmHMACSHAAuthProtocol": "authkey11",' + ' "usmAesCfb128Protocol": "privkey11"' + ' },' + ' {' + ' "user": "user12",' + ' "engineId": "8000000000000012",' + ' "usmHMACSHAAuthProtocol": "authkey12",' + ' "usmAesBlumenthalCfb192Protocol": "privkey12"' + ' },' + ' {' + ' "user": "user13",' + ' "engineId": "8000000000000013",' + ' "usmHMACSHAAuthProtocol": "authkey13",' + ' "usmAesBlumenthalCfb256Protocol": "privkey13"' + ' },' + ' {' + ' "user": "user14",' + ' "engineId": "8000000000000014",' + ' "usmHMACSHAAuthProtocol": "authkey14",' + ' "usmAesCfb192Protocol": "privkey14"' + ' },' + ' {' + ' "user": "user15",' + ' "engineId": "8000000000000015",' + ' "usmHMACSHAAuthProtocol": "authkey15",' + ' "usmAesCfb256Protocol": "privkey15"' + ' },' + ' {' + ' "user": "user17",' + ' "engineId": "8000000000000017",' + ' "usmHMAC128SHA224AuthProtocol": "authkey17",' + ' "usmDESPrivProtocol": "privkey17"' + ' },' + ' {' + ' "user": "user18",' + ' "engineId": "8000000000000018",' + ' "usmHMAC128SHA224AuthProtocol": "authkey18",' + ' "usm3DESEDEPrivProtocol": "privkey18"' + ' },' + ' {' + ' "user": "user19",' + ' "engineId": "8000000000000019",' + ' "usmHMAC128SHA224AuthProtocol": "authkey19",' + ' "usmAesCfb128Protocol": "privkey19"' + ' },' + ' {' + ' "user": "user20",' + ' "engineId": "8000000000000020",' + ' "usmHMAC128SHA224AuthProtocol": "authkey20",' + ' "usmAesBlumenthalCfb192Protocol": "privkey20"' + ' },' + ' {' + ' "user": "user21",' + ' "engineId": "8000000000000021",' + ' "usmHMAC128SHA224AuthProtocol": "authkey21",' + ' "usmAesBlumenthalCfb256Protocol": "privkey21"' + ' },' + ' {' + ' "user": "user22",' + ' "engineId": "8000000000000022",' + ' "usmHMAC128SHA224AuthProtocol": "authkey22",' + ' "usmAesCfb192Protocol": "privkey22"' + ' },' + ' {' + ' "user": "user23",' + ' "engineId": "8000000000000023",' + ' "usmHMAC128SHA224AuthProtocol": "authkey23",' + ' "usmAesCfb256Protocol": "privkey23"' + ' },' + ' {' + ' "user": "user25",' + ' "engineId": "8000000000000025",' + ' "usmHMAC192SHA256AuthProtocol": "authkey25",' + ' "usmDESPrivProtocol": "privkey25"' + ' },' + ' {' + ' "user": "user26",' + ' "engineId": "8000000000000026",' + ' "usmHMAC192SHA256AuthProtocol": "authkey26",' + ' "usm3DESEDEPrivProtocol": "privkey26"' + ' },' + ' {' + ' "user": "user27",' + ' "engineId": "8000000000000027",' + ' "usmHMAC192SHA256AuthProtocol": "authkey27",' + ' "usmAesCfb128Protocol": "privkey27"' + ' },' + ' {' + ' "user": "user28",' + ' "engineId": "8000000000000028",' + ' "usmHMAC192SHA256AuthProtocol": "authkey28",' + ' "usmAesBlumenthalCfb192Protocol": "privkey28"' + ' },' + ' {' + ' "user": "user29",' + ' "engineId": "8000000000000029",' + ' "usmHMAC192SHA256AuthProtocol": "authkey29",' + ' "usmAesBlumenthalCfb256Protocol": "privkey29"' + ' },' + ' {' + ' "user": "user30",' + ' "engineId": "8000000000000030",' + ' "usmHMAC192SHA256AuthProtocol": "authkey30",' + ' "usmAesCfb192Protocol": "privkey30"' + ' },' + ' {' + ' "user": "user31",' + ' "engineId": "8000000000000031",' + ' "usmHMAC192SHA256AuthProtocol": "authkey31",' + ' "usmAesCfb256Protocol": "privkey31"' + ' },' + ' {' + ' "user": "user33",' + ' "engineId": "8000000000000033",' + ' "usmHMAC256SHA384AuthProtocol": "authkey33",' + ' "usmDESPrivProtocol": "privkey33"' + ' },' + ' {' + ' "user": "user34",' + ' "engineId": "8000000000000034",' + ' "usmHMAC256SHA384AuthProtocol": "authkey34",' + ' "usm3DESEDEPrivProtocol": "privkey34"' + ' },' + ' {' + ' "user": "user35",' + ' "engineId": "8000000000000035",' + ' "usmHMAC256SHA384AuthProtocol": "authkey35",' + ' "usmAesCfb128Protocol": "privkey35"' + ' },' + ' {' + ' "user": "user36",' + ' "engineId": "8000000000000036",' + ' "usmHMAC256SHA384AuthProtocol": "authkey36",' + ' "usmAesBlumenthalCfb192Protocol": "privkey36"' + ' },' + ' {' + ' "user": "user37",' + ' "engineId": "8000000000000037",' + ' "usmHMAC256SHA384AuthProtocol": "authkey37",' + ' "usmAesBlumenthalCfb256Protocol": "privkey37"' + ' },' + ' {' + ' "user": "user38",' + ' "engineId": "8000000000000038",' + ' "usmHMAC256SHA384AuthProtocol": "authkey38",' + ' "usmAesCfb192Protocol": "privkey38"' + ' },' + ' {' + ' "user": "user39",' + ' "engineId": "8000000000000039",' + ' "usmHMAC256SHA384AuthProtocol": "authkey39",' + ' "usmAesCfb256Protocol": "privkey39"' + ' },' + ' {' + ' "user": "user41",' + ' "engineId": "8000000000000041",' + ' "usmHMAC384SHA512AuthProtocol": "authkey41",' + ' "usmDESPrivProtocol": "privkey41"' + ' },' + ' {' + ' "user": "user42",' + ' "engineId": "8000000000000042",' + ' "usmHMAC384SHA512AuthProtocol": "authkey42",' + ' "usm3DESEDEPrivProtocol": "privkey42"' + ' },' + ' {' + ' "user": "user43",' + ' "engineId": "8000000000000043",' + ' "usmHMAC384SHA512AuthProtocol": "authkey43",' + ' "usmAesCfb128Protocol": "privkey43"' + ' },' + ' {' + ' "user": "user44",' + ' "engineId": "8000000000000044",' + ' "usmHMAC384SHA512AuthProtocol": "authkey44",' + ' "usmAesBlumenthalCfb192Protocol": "privkey44"' + ' },' + ' {' + ' "user": "user45",' + ' "engineId": "8000000000000045",' + ' "usmHMAC384SHA512AuthProtocol": "authkey45",' + ' "usmAesBlumenthalCfb256Protocol": "privkey45"' + ' },' + ' {' + ' "user": "user46",' + ' "engineId": "8000000000000046",' + ' "usmHMAC384SHA512AuthProtocol": "authkey46",' + ' "usmAesCfb192Protocol": "privkey46"' + ' },' + ' {' + ' "user": "user47",' + ' "engineId": "8000000000000047",' + ' "usmHMAC384SHA512AuthProtocol": "authkey47",' + ' "usmAesCfb256Protocol": "privkey47"' + ' },' + ' {' + ' "user": "user48",' + ' "engineId": "8000000000000048",' + ' "usmNoAuthProtocol": "authkey48",' + ' "usmNoPrivProtocol": "privkey48"' + ' },' + ' {' + ' "user": "user49",' + ' "engineId": "8000000000000049",' + ' "unknownAuthProtocol": "authkey49",' + ' "unknownProtocol": "privkey49"' + ' }' + ' ]' + ' }' + ) + JSON_MISSING_USER = ( + ' "snmpv3_config": {' + ' "usm_users": [' + ' {' + ' "baduser": "user50",' + ' "engineId": "8000000000000050",' + ' "unknownAuthProtocol": "authkey50",' + ' "unknownProtocol": "privkey50"' + ' }' + ' ]' + ' }' + ) + JSON_MISSING_ENGINE = ( + ' "snmpv3_config": {' + ' "usm_users": [' + ' {' + ' "user": "user51",' + ' "badengineId": "8000000000000051",' + ' "unknownAuthProtocol": "authkey51",' + ' "unknownProtocol": "privkey51"' + ' }' + ' ]' + ' }' + ) + JSON_COMMA = ',' + JSON_END = '}' + + @classmethod + def setUpClass(cls): + tds.init() + + def test_v3_config_present(self): """ Test that snmpv3 config is present """ - tds.c_config = json.loads( - '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" }, { "user": "user4"} , { "engineId": "1"}, { "user": "user6", "engineId": "8000000000000006", "usmNoAuth": "authkey3", "usmAesCfb192": "privkey6" }, { "user": "user7", "engineId": "8000000000000007", "usmNoAuth": "", "usmNoPriv": "" }] } }' + pconfig = ( + test_snmpv3_config.JSON_START + + test_snmpv3_config.JSON_COMMA + + test_snmpv3_config.JSON_USERS + + test_snmpv3_config.JSON_END ) + tds.c_config = json.loads(pconfig) - # del os.environ['CBS_SIM_JSON'] - # result = trapd_get_cbs_config.get_cbs_config() - # print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, False) + snmp_engine = engine.SnmpEngine() + rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) + self.assertEqual(rsnmp_engine, snmp_engine) - with pytest.raises(Exception) as pytest_wrapped_sys_exit: - snmp_engine = engine.SnmpEngine() - result = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) - # config, snmp_engine=trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, pytest_json_data_with_v3) - assert pytest_wrapped_sys_exit.type == SystemExit - # assert pytest_wrapped_sys_exit.value.code == 1 def test_v3_config_not_present(self): """ Test that app is ok if v3 config not present """ - tds.c_config = json.loads( - '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } } }' - ) + pconfig = ( + test_snmpv3_config.JSON_START + + test_snmpv3_config.JSON_END + ) + tds.c_config = json.loads(pconfig) - # del os.environ['CBS_SIM_JSON'] - # result = trapd_get_cbs_config.get_cbs_config() - # print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, False) + snmp_engine = engine.SnmpEngine() + rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) + self.assertEqual(rsnmp_engine, snmp_engine) + + + @unittest.skip("need to understand what happens when a username is missing") + def test_v3_config_missing_user(self): + """ + Test that app is ok if v3 config has a missing user name + """ + pconfig = ( + test_snmpv3_config.JSON_START + + test_snmpv3_config.JSON_COMMA + + test_snmpv3_config.JSON_MISSING_USER + + test_snmpv3_config.JSON_END + ) + tds.c_config = json.loads(pconfig) + + snmp_engine = engine.SnmpEngine() + rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) + self.assertEqual(rsnmp_engine, snmp_engine) + + + def test_v3_config_missing_engine(self): + """ + Test that app is ok if v3 config has a missing engine name + """ + pconfig = ( + test_snmpv3_config.JSON_START + + test_snmpv3_config.JSON_COMMA + + test_snmpv3_config.JSON_MISSING_ENGINE + + test_snmpv3_config.JSON_END + ) + tds.c_config = json.loads(pconfig) - with pytest.raises(Exception) as pytest_wrapped_sys_exit: - snmp_engine = engine.SnmpEngine() - result = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) - # config, snmp_engine=trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, pytest_json_data_with_v3) - assert pytest_wrapped_sys_exit.type == SystemExit - # assert pytest_wrapped_sys_exit.value.code == 1 + snmp_engine = engine.SnmpEngine() + rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) + self.assertEqual(rsnmp_engine, snmp_engine) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py index 16b0a17..3041884 100644 --- a/tests/test_trapd_stormwatch.py +++ b/tests/test_trapd_stormwatch.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,14 +14,15 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit import time +from unittest.mock import patch import trapd_stormwatch as sw import trapd_stormwatch_settings as sws import trapd_stats_settings as stats +import trapd_settings as tds class test_cleanup_and_exit(unittest.TestCase): @@ -29,84 +30,182 @@ class test_cleanup_and_exit(unittest.TestCase): Test for presense of required vars """ - def test_increment_existing_counter(self): - """ - Test increment counter - """ - sw.sw_init() + @classmethod + def setUp(cls): + tds.init() stats.init() + sw.sw_init() + sws.init() - oid = ".1.2.3.4.5.6" - sws.sw_config_oid_dict[oid] = True - sws.sw_config_low_water_in_interval_dict[oid] = 1 - sws.sw_config_high_water_in_interval_dict[oid] = 10 - - try: - sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6") - result = True - except: - result = False - - self.assertEqual(result, True) - - # try again, but with stats.total_notifications removed - delattr(stats, "total_notifications") - try: - sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6") - result = True - except: - result = False + def test_sw_init(self): + """ test sw_init() """ + sw.sw_init() + self.assertEqual(sws.sw_interval_in_seconds, 60) + sws.init() + self.assertEqual(sws.sw_config_category, {}) - self.assertEqual(result, True) def test_sw_clear_dicts(self): """ Test sw_clear_dicts """ - sw.sw_init() - # initialize attributes not handled by sw_init() - sws.sw_storm_counter_dict = {} - stats.agent_counter_dict = {} - stats.oid_counter_dict = {} - sws.sw_config_category = {} - # provide a value that can tested for - sws.sw_storm_counter_dict["abc"] = "def" + with patch.dict('trapd_stormwatch_settings.sw_storm_counter_dict'): + with patch.dict('trapd_stormwatch_settings.sw_config_category'): + with patch.dict('trapd_stats_settings.agent_counter_dict'): + with patch.dict('trapd_stats_settings.oid_counter_dict'): + + # initialize attributes not handled by sw_init() + sws.sw_storm_counter_dict = {} + sws.sw_config_category = {} + stats.agent_counter_dict = {} + stats.oid_counter_dict = {} + + # provide a value that can tested for + sws.sw_storm_counter_dict["abc"] = "def" + self.assertTrue("abc" in sws.sw_storm_counter_dict) + + self.assertTrue(sw.sw_clear_dicts()) + self.assertFalse("abc" in sws.sw_storm_counter_dict) - sw.sw_clear_dicts() - self.assertFalse("abc" in sws.sw_storm_counter_dict) + # now make sure we get an exception + sws.sw_config_category = 3 + self.assertFalse(sw.sw_clear_dicts()) - # now make sure we get an exception - sws.sw_config_category = 3 - self.assertFalse(sw.sw_clear_dicts()) - # clean up the attributes we added above - delattr(sws, "sw_storm_counter_dict") - delattr(stats, "agent_counter_dict") - delattr(stats, "oid_counter_dict") - delattr(sws, "sw_config_category") + def test_sw_load_trap_config(self): + """ Test sw_load_trap_config(_config) """ + trap_dict_info = { + "uuid": "06f6e91c-3236-11e8-9953-005056865aac", + "agent address": "1.2.3.4", + "agent name": "test-agent.nodomain.com", + "cambria.partition": "test-agent.nodomain.com", + "community": "", + "community len": 0, + "epoch_serno": 15222068260000, + "protocol version": "v2c", + "time received": 1522206826.2938566, + "trap category": "ONAP-COLLECTOR-SNMPTRAP", + "sysUptime": "218567736", + "notify OID": "1.3.6.1.4.1.9999.9.9.999", + "trap_config": { }, + "notify OID len": 10, + } + + # normal operation, and variations + ret1 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret1, 0) + self.assertIsInstance(ret1, int) + + trap_dict_info["trap_config"]["notify_oids"] = [ + { "oidx": "1.3.6.1.4.1.9999.9.9.888" } + ] + ret2 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret2, 0) + self.assertIsInstance(ret2, int) + + trap_dict_info["trap_config"]["metric_log_notification_threshold_pct"] = 33 + ret3 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret3, 0) + self.assertIsInstance(ret3, int) + + trap_dict_info["trap_config"]["sw_interval_in_seconds"] = 50 + ret4 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret4, 0) + self.assertIsInstance(ret4, int) + + trap_dict_info["trap_config"]["notify_oids"] = [ + { + "oid": "1.3.6.1.4.1.9999.9.9.888", + "sw_high_water_in_interval": 3, + "sw_low_water_in_interval": 2, + "category": "abc", + } + ] + ret5 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret5, 1) + self.assertIsInstance(ret5, int) + + delattr(sws, 'sw_storm_active_dict') + ret6 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret6, 1) + self.assertIsInstance(ret6, int) + def test_sw_log_metrics(self): """ Test sw_clear_log_metrics """ - sw.sw_init() - stats.total_notifications = 3 stats.total_notifications = 50 sws.sw_interval_in_seconds = 30 stats.agent_counter_dict = {"a": 3, "b": 40} stats.metric_log_notification_threshold_pct = 30 - sw.sw_log_metrics() - # make sure we got this far - assert True + with patch('trapd_stormwatch.ecomp_logger') as magic_ecomp_logger: + sw.sw_log_metrics() + self.assertEqual(magic_ecomp_logger.call_count, 3) + + + def test_increment_existing_counter(self): + """ + Test increment counter. There should NOT be an exception. + """ + oid = ".1.2.3.4.5.6" + sws.sw_config_oid_dict[oid] = True + sws.sw_config_low_water_in_interval_dict[oid] = 1 + sws.sw_config_high_water_in_interval_dict[oid] = 10 + + loc_agent = "192.168.1.1" + stats.agent_counter_dict[loc_agent] = 2 + stats.oid_counter_dict[oid] = 102 + + sv_total_notifications = stats.total_notifications + sv_agent_count_dict = stats.agent_counter_dict[loc_agent] + sv_oid_count_dict = stats.oid_counter_dict[oid] + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except Exception as e: + result = False + self.assertEqual(result, True, "test_increment_existing_counter") + self.assertEqual(stats.total_notifications, sv_total_notifications+1) + self.assertEqual(stats.agent_counter_dict[loc_agent], sv_agent_count_dict+1) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+1) + + # try again, without agent_counter_dict[loc_agent] + del stats.agent_counter_dict[loc_agent] + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except Exception as e: + result = False + self.assertEqual(result, True, "test_increment_existing_counter") + self.assertEqual(stats.total_notifications, sv_total_notifications+2) + self.assertEqual(stats.agent_counter_dict[loc_agent], 1) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+2) + + # try again, but with stats.total_notifications removed + delattr(stats, "total_notifications") + + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except: + result = False + + self.assertEqual(result, True, "stats.total_notifications removed") + self.assertEqual(stats.total_notifications, 1) + self.assertEqual(stats.agent_counter_dict[loc_agent], 2) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+3) + def test_sw_storm_active(self): """ Test sw_storm_active() """ - sw.sw_init() + print(f"sw_last_stormwatch_dict_analysis={sws.sw_last_stormwatch_dict_analysis}") + # initialize attributes not handled by sw_init() stats.oid_counter_dict = {} @@ -154,7 +253,9 @@ class test_cleanup_and_exit(unittest.TestCase): self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) # now force sws.sw_last_stormwatch_dict_analysis to an old value - sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20 + sws.sw_last_stormwatch_dict_analysis = ( + int(time.time()) - sws.sw_interval_in_seconds - 20 + ) # and make certain that stats.oid_counter_dict got cleared. if not hasattr(stats, "oid_counter_dict"): stats.oid_counter_dict = {} @@ -164,6 +265,47 @@ class test_cleanup_and_exit(unittest.TestCase): # .get("abc") != None) -if __name__ == "__main__": + @patch('trapd_stormwatch.ecomp_logger') + def test_sw_reset_counter_dict(self, magic_ecomp_logger): + """ test sw_reset_counter_dict() """ + + loc_agent = "192.168.1.1" + loc_oid = ".1.2.3.4.5.6" + loc_agent_oid = loc_agent + " " + loc_oid + + self.assertTrue(sw.sw_reset_counter_dict()) + self.assertEqual(magic_ecomp_logger.call_count, 2) + + # cases: + # for lao in storm_active_dict: + # storm_counter_dict[lao] >= high_water_val[lo] + # storm_counter_dict[lao] < high_water_val[lo] + # storm_counter_dict[lao] < low_water_val[lo] + # storm_counter_dict[lao] >= low_water_val[lo] + + with patch.dict(sws.sw_storm_counter_dict, { + loc_agent_oid: 20 + }): + # values around the 20 above + for high_water_mark in [2, 60]: + # values around the 20 above + for low_water_mark in [0, 30]: + with patch.dict(sws.sw_config_high_water_in_interval_dict, { + loc_oid: high_water_mark + }): + with patch.dict(sws.sw_config_low_water_in_interval_dict, { + loc_oid: low_water_mark + }): + with patch.dict(sws.sw_storm_active_dict, { + loc_agent_oid: "anything" + }): + sv_storm_counter = sws.sw_storm_counter_dict[loc_agent_oid] + magic_ecomp_logger.call_count = 0 + self.assertTrue(sw.sw_reset_counter_dict()) + self.assertEqual(magic_ecomp_logger.call_count, 3) + self.assertEqual(sws.sw_storm_counter_dict[loc_agent_oid], 0) + + +if __name__ == "__main__": # pragma: no cover # sws.init() unittest.main() diff --git a/tests/test_trapd_stormwatch_settings.py b/tests/test_trapd_stormwatch_settings.py index bcb04a7..f5ad9a7 100644 --- a/tests/test_trapd_stormwatch_settings.py +++ b/tests/test_trapd_stormwatch_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit @@ -34,26 +33,16 @@ class test_cleanup_and_exit(unittest.TestCase): Test nosuch var """ sws.init() - try: - sws.no_such_var - result = True - except: - result = False + self.assertFalse(hasattr(sws, 'no_such_var')) - self.assertEqual(result, False) def test_storm_counter_dict(self): """ Test storm_counter_dict """ sws.init() - try: - sws.sw_storm_counter_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_storm_counter_dict')) - self.assertEqual(result, True) def test_storm_active_dict(self): """ @@ -61,13 +50,8 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_storm_active_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_storm_active_dict')) - self.assertEqual(result, True) def test_sw_config_oid_dict(self): """ @@ -75,13 +59,8 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_config_oid_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_config_oid_dict')) - self.assertEqual(result, True) def test_sw_config_low_water_in_interval_dict(self): """ @@ -89,13 +68,8 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_config_low_water_in_interval_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_config_low_water_in_interval_dict')) - self.assertEqual(result, True) def test_sw_config_high_water_in_interval_dict(self): """ @@ -103,13 +77,8 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_config_high_water_in_interval_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_config_high_water_in_interval_dict')) - self.assertEqual(result, True) def test_sw_config_category(self): """ @@ -117,43 +86,26 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_config_category - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_config_category')) - self.assertEqual(result, True) def test_sw_interval_in_seconds(self): """ Test sw_interval """ - sws.init() - try: - str(sws.sw_interval_in_seconds).isnumeric() - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_interval_in_seconds')) + self.assertTrue(str(sws.sw_interval_in_seconds).isnumeric()) - self.assertEqual(result, True) def test_sw_last_stormwatch_dict_analysis(self): """ Test last_stormwatch_dict_analysis """ - sws.init() - try: - str(sws.sw_last_stormwatch_dict_analysis).isnumeric() - result = True - except: - result = False - - self.assertEqual(result, True) + self.assertTrue(hasattr(sws, 'sw_last_stormwatch_dict_analysis')) + self.assertTrue(str(sws.sw_last_stormwatch_dict_analysis).isnumeric()) -if __name__ == "__main__": - # sws.init() - unittest.main() +if __name__ == "__main__": # pragma: no cover + unittest.main(verbosity=2) diff --git a/tests/test_trapd_vb_types.py b/tests/test_trapd_vb_types.py index 5792b8c..28150c9 100644 --- a/tests/test_trapd_vb_types.py +++ b/tests/test_trapd_vb_types.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,18 +14,10 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest -import json import unittest -import os -from onap_dcae_cbs_docker_client.client import get_config -from trapd_exit import cleanup_and_exit -from trapd_io import stdout_logger, ecomp_logger -import trapd_settings as tds import trapd_vb_types - -from pysnmp.entity import engine, config +import trapd_settings as tds class test_trapd_vb_types(unittest.TestCase): @@ -33,121 +25,102 @@ class test_trapd_vb_types(unittest.TestCase): Test snmpv3 module """ - good_varbind_types = [ - "Integer", - "Unsigned32", - "Counter32", - "OctetString", - "ObjectIdentifier", - "TimeTicks", - "IpAddress", - ] + @classmethod + def setUpClass(cls): + tds.init() - def trapd_vb_type_conversion_integer(self): + + def test_trapd_vb_type_conversion_integer32(self): """ Test that pysnmp varbind types Integer converts """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer32"), "integer") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer32") - self.assertEqual(result, "integer") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_integer(self): """ Test that pysnmp varbind types Integer converts """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer"), "integer") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer") - self.assertEqual(result, "integer") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_gauge32(self): """ Test that pysnmp varbind types Integer converts """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Gauge32"), "unsigned") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Gauge32") - self.assertEqual(result, "unsigned") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_counter32(self): """ Test that pysnmp varbind types Integer converts """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Counter32"), "counter32") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Counter32") - self.assertEqual(result, "counter32") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_octetstring(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("OctetString"), "octet") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("OctetString") - self.assertEqual(result, "octet") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_py_type_5(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_5"), "hex") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_5") - self.assertEqual(result, "hex") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_py_type_6(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_6"), "decimal") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_6") - self.assertEqual(result, "decimal") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_null(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Null"), "null") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Null") - self.assertEqual(result, "null") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_objectidentifier(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("ObjectIdentifier"), "oid") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("ObjectIdentifier") - self.assertEqual(result, "oid") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_timeticks(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("TimeTicks"), "timeticks") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("TimeTicks") - self.assertEqual(result, "timeticks") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_ipaddress(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("IpAddress"), "ipaddress") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("IpAddress") - self.assertEqual(result, "ipaddress") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_bits(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Bits"), "bits") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Bits") - self.assertEqual(result, "bits") - def trapd_vb_type_conversion_invalid(self): + def test_trapd_vb_type_conversion_invalid(self): """ Test that pysnmp varbind types convert accurately """ - - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("noSuchVarbindType") # should return default of octet if not defined - self.assertEqual(result, "octet") + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("noSuchVarbindType"), "octet") -if __name__ == "__main__": - unittest.main() +if __name__ == "__main__": # pragma: no cover + unittest.main(verbosity=2) @@ -1,23 +1,24 @@ # content of: tox.ini , put in same dir as setup.py [tox] # envlist = py36 -envlist = py36,py37,py38,py39 -skip_missing_interpreters = true +# envlist = py36,py37,py38,py39 +envlist = py39 +# skip_missing_interpreters = true +# isolated_build = True [testenv] deps= - -rrequirements.txt + -rrequirements.txt pytest coverage pytest-cov + setenv = PYTHONPATH={toxinidir}/snmptrap:{toxinidir}/snmptrap/mod:{toxinidir}/tests CBS_SIM_JSON={toxinidir}/etc/snmptrapd.json -recreate = True + +# recreate = True + commands= - mkdir -p /tmp/opt/app/snmptrap/logs/ - mkdir -p /tmp/opt/app/snmptrap/tmp/ - mkdir -p /tmp/opt/app/snmptrap/etc/ - mkdir -p /tmp/opt/app/snmptrap/data/ - pytest --cov snmptrap --cov-report=xml --cov-report=term tests --verbose + pytest --cov snmptrap --cov-report=xml --cov-report=html --cov-report=term tests --verbose --verbose --verbose whitelist_externals = mkdir diff --git a/version.properties b/version.properties index 7b12999..a640c99 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=2 minor=0 -patch=6 +patch=7 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT |