aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2022-08-22 20:59:08 +0000
committerGerrit Code Review <gerrit@onap.org>2022-08-22 20:59:08 +0000
commit0ec57f0b34058a397aecee00c198e7559ecb7b0c (patch)
treedfbbca6511883ab5927e626fd910c77c7fe460bb
parentfd09404338108987774ffae7aa90b67370dd290d (diff)
parent4b6b34642374103b8ccf938eb9e970232bfb63ae (diff)
Merge "CodeCoverage improvement (60% to 90%)"
-rw-r--r--Changelog.md3
-rw-r--r--makefile5
-rw-r--r--pom.xml6
-rw-r--r--setup.py4
-rw-r--r--snmptrap/makefile2
-rw-r--r--snmptrap/mod/trapd_exit.py7
-rw-r--r--snmptrap/mod/trapd_get_cbs_config.py29
-rw-r--r--snmptrap/mod/trapd_http_session.py5
-rw-r--r--snmptrap/mod/trapd_io.py41
-rw-r--r--snmptrap/mod/trapd_runtime_pid.py9
-rw-r--r--snmptrap/mod/trapd_settings.py4
-rw-r--r--snmptrap/mod/trapd_snmpv3.py6
-rw-r--r--snmptrap/mod/trapd_stormwatch.py20
-rwxr-xr-xsnmptrap/scheduler.sh12
-rw-r--r--snmptrap/snmptrapd.py40
-rwxr-xr-xsnmptrap/snmptrapd.sh26
-rw-r--r--tests/test_snmptrapd.py610
-rw-r--r--tests/test_trapd_exit.py26
-rw-r--r--tests/test_trapd_get_cbs_config.py221
-rw-r--r--tests/test_trapd_http_session.py63
-rw-r--r--tests/test_trapd_io.py521
-rw-r--r--tests/test_trapd_runtime_pid.py39
-rw-r--r--tests/test_trapd_settings.py48
-rw-r--r--tests/test_trapd_snmpv3.py471
-rw-r--r--tests/test_trapd_stormwatch.py250
-rw-r--r--tests/test_trapd_stormwatch_settings.py76
-rw-r--r--tests/test_trapd_vb_types.py95
-rw-r--r--tox.ini19
-rw-r--r--version.properties2
29 files changed, 2010 insertions, 650 deletions
diff --git a/Changelog.md b/Changelog.md
index 78b3a8a..c102c46 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## [2.0.7] - 2022/08/17
+- [DCAEGEN2-3158] CodeCoverage improvement for dcaegen2-collectors-snmptrap (60% to 90%)
+
## [2.0.6] - 2021/10/26
### Changed
* [DCAEGEN2-2957] SNMP Trap collector - STDOUT complaince
diff --git a/makefile b/makefile
index 6cece87..0bd00a3 100644
--- a/makefile
+++ b/makefile
@@ -1,3 +1,4 @@
runtests:
- . ~/bin/set_proxies; tox tests | cat
- coverage html
+ -[ -d /tmp/opt ] && find /tmp/opt -type d -exec chmod 755 {} +
+ rm -rf /tmp/opt
+ tox tests | cat
diff --git a/pom.xml b/pom.xml
index 8d872f4..d180e03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<!--
-================================================================================
-Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+============LICENSE_START=======================================================
+Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>snmptrap</artifactId>
<name>dcaegen2-collectors-snmptrap</name>
- <version>2.0.6-SNAPSHOT</version>
+ <version>2.0.7-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/setup.py b/setup.py
index 8716437..0fb95b0 100644
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2021 Deutsche Telekom. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -25,7 +25,7 @@ from setuptools import setup, find_packages
setup(
name="snmptrap",
description="snmp trap receiver for ONAP docker image",
- version="2.0.6",
+ version="2.0.7",
packages=find_packages(),
install_requires=["pysnmp==4.4.12", "requests==2.18.3", "onap_dcae_cbs_docker_client==2.2.1", "pyyaml"],
author="Dave L",
diff --git a/snmptrap/makefile b/snmptrap/makefile
new file mode 100644
index 0000000..aa9d159
--- /dev/null
+++ b/snmptrap/makefile
@@ -0,0 +1,2 @@
+runtests:
+ cd .. && $(MAKE) runtests
diff --git a/snmptrap/mod/trapd_exit.py b/snmptrap/mod/trapd_exit.py
index 1d1ea16..0959399 100644
--- a/snmptrap/mod/trapd_exit.py
+++ b/snmptrap/mod/trapd_exit.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -46,13 +46,8 @@ def cleanup_and_exit(_loc_exit_code, _pid_file_name):
none
:Keywords:
runtime PID exit
- :Variables:
- _num_params
- number of parameters passed to module
"""
- # _num_params = len(locals())
-
if _pid_file_name is not None:
rc = rm_pid(_pid_file_name)
sys.exit(_loc_exit_code)
diff --git a/snmptrap/mod/trapd_get_cbs_config.py b/snmptrap/mod/trapd_get_cbs_config.py
index cfae994..9ed7916 100644
--- a/snmptrap/mod/trapd_get_cbs_config.py
+++ b/snmptrap/mod/trapd_get_cbs_config.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -64,28 +64,23 @@ def get_cbs_config():
msg = "ONAP controller not present, trying json config override via CBS_SIM_JSON env variable"
stdout_logger(msg)
- try:
- _cbs_sim_json_file = os.getenv("CBS_SIM_JSON", "None")
- except Exception as e:
+ _cbs_sim_json_file = os.getenv("CBS_SIM_JSON")
+
+ if _cbs_sim_json_file is None:
msg = "CBS_SIM_JSON not defined - FATAL ERROR, exiting"
stdout_logger(msg)
cleanup_and_exit(1, None)
- if _cbs_sim_json_file == "None":
- msg = "CBS_SIM_JSON not defined - FATAL ERROR, exiting"
+ msg = "ONAP controller override specified via CBS_SIM_JSON: %s" % _cbs_sim_json_file
+ stdout_logger(msg)
+ try:
+ tds.c_config = json.load(open(_cbs_sim_json_file))
+ msg = "%s loaded and parsed successfully" % _cbs_sim_json_file
stdout_logger(msg)
- cleanup_and_exit(1, None)
- else:
- msg = "ONAP controller override specified via CBS_SIM_JSON: %s" % _cbs_sim_json_file
+ except Exception as e:
+ msg = "Unable to load CBS_SIM_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting"
stdout_logger(msg)
- try:
- tds.c_config = json.load(open(_cbs_sim_json_file))
- msg = "%s loaded and parsed successfully" % _cbs_sim_json_file
- stdout_logger(msg)
- except Exception as e:
- msg = "Unable to load CBS_SIM_JSON " + _cbs_sim_json_file + " (invalid json?) - FATAL ERROR, exiting"
- stdout_logger(msg)
- cleanup_and_exit(1, None)
+ cleanup_and_exit(1, None)
# display consul config returned, regardless of source
msg = "cbs config: %s" % json.dumps(tds.c_config)
diff --git a/snmptrap/mod/trapd_http_session.py b/snmptrap/mod/trapd_http_session.py
index 7118d00..1c93edc 100644
--- a/snmptrap/mod/trapd_http_session.py
+++ b/snmptrap/mod/trapd_http_session.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,6 +23,9 @@ __docformat__ = "restructuredtext"
import os
import requests
import traceback
+from trapd_io import ecomp_logger, stdout_logger
+import trapd_settings as tds
+from trapd_exit import cleanup_and_exit
prog_name = os.path.basename(__file__)
diff --git a/snmptrap/mod/trapd_io.py b/snmptrap/mod/trapd_io.py
index f7c48b7..2ba44b4 100644
--- a/snmptrap/mod/trapd_io.py
+++ b/snmptrap/mod/trapd_io.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -54,15 +54,19 @@ def roll_all_logs():
# NOTE: this will go away when onap logging is standardized/available
try:
# open various ecomp logs - if any fails, exit
- for fd in [
- tds.eelf_error_fd,
- tds.eelf_debug_fd,
- tds.eelf_audit_fd,
- tds.eelf_metrics_fd,
- tds.arriving_traps_fd,
- tds.json_traps_fd,
+ for fd, nm in [
+ (tds.eelf_error_fd, "error"),
+ (tds.eelf_debug_fd, "debug"),
+ (tds.eelf_audit_fd, "eelf"),
+ (tds.eelf_metrics_fd, "metrics"),
+ (tds.arriving_traps_fd, "arriving_traps"),
+ (tds.json_traps_fd, "json_traps"),
]:
- fd.close()
+ if fd is None:
+ msg = "Error closing log file for " + nm
+ stdout_logger(msg)
+ else:
+ fd.close()
roll_file(tds.eelf_error_file_name)
roll_file(tds.eelf_debug_file_name)
@@ -86,7 +90,7 @@ def roll_all_logs():
try:
tds.json_traps_fd = open_file(tds.json_traps_filename)
except Exception as e:
- msg = "Error opening json_log %s : %s" % (json_traps_filename, str(e))
+ msg = "Error opening json_log %s : %s" % (tds.json_traps_filename, str(e))
stdout_logger(msg)
cleanup_and_exit(1, tds.pid_file_name)
@@ -96,7 +100,7 @@ def roll_all_logs():
try:
tds.arriving_traps_fd = open_file(tds.arriving_traps_filename)
except Exception as e:
- msg = "Error opening arriving traps %s : %s" % (arriving_traps_filename, str(e))
+ msg = "Error opening arriving traps %s : %s" % (tds.arriving_traps_filename, str(e))
stdout_logger(msg)
cleanup_and_exit(1, tds.pid_file_name)
@@ -119,6 +123,7 @@ def open_eelf_logs():
tds.eelf_error_fd = open_file(tds.eelf_error_file_name)
except Exception as e:
+ # here if we cannot create the filename
msg = "Error opening eelf error log : " + str(e)
stdout_logger(msg)
cleanup_and_exit(1, tds.pid_file_name)
@@ -128,6 +133,7 @@ def open_eelf_logs():
tds.eelf_debug_fd = open_file(tds.eelf_debug_file_name)
except Exception as e:
+ # here if we cannot create the filename
msg = "Error opening eelf debug log : " + str(e)
stdout_logger(msg)
cleanup_and_exit(1, tds.pid_file_name)
@@ -136,6 +142,7 @@ def open_eelf_logs():
tds.eelf_audit_file_name = tds.c_config["files"]["eelf_base_dir"] + "/" + tds.c_config["files"]["eelf_audit"]
tds.eelf_audit_fd = open_file(tds.eelf_audit_file_name)
except Exception as e:
+ # here if we cannot create the filename
msg = "Error opening eelf audit log : " + str(e)
stdout_logger(msg)
cleanup_and_exit(1, tds.pid_file_name)
@@ -146,6 +153,7 @@ def open_eelf_logs():
)
tds.eelf_metrics_fd = open_file(tds.eelf_metrics_file_name)
except Exception as e:
+ # here if we cannot create the filename
msg = "Error opening eelf metric log : " + str(e)
stdout_logger(msg)
cleanup_and_exit(1, tds.pid_file_name)
@@ -202,15 +210,14 @@ def open_file(_loc_file_name):
stdout_logger(msg)
cleanup_and_exit(1, tds.pid_file_name)
- # # # # # # # # # # # # #
- # fx: close_file
- # # # # # # # # # # # # #
- """
- close _loc_file_name, return True with success, False otherwise
- """
+
+# # # # # # # # # # # # #
+# fx: close_file
+# # # # # # # # # # # # #
def close_file(_loc_fd, _loc_filename):
+ """ close _loc_file_name, return True with success, False otherwise """
try:
_loc_fd.close()
diff --git a/snmptrap/mod/trapd_runtime_pid.py b/snmptrap/mod/trapd_runtime_pid.py
index c647885..0a9eac6 100644
--- a/snmptrap/mod/trapd_runtime_pid.py
+++ b/snmptrap/mod/trapd_runtime_pid.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -53,11 +53,7 @@ def save_pid(_pid_file_name):
except IOError:
print("IOError saving PID file %s :" % _pid_file_name)
return False
- # except:
- # print("Error saving PID file %s :" % _pid_file_name)
- # return False
else:
- # print("Runtime PID file: %s" % _pid_file_name)
return True
@@ -76,7 +72,6 @@ def rm_pid(_pid_file_name):
:Keywords:
pid /var/run
"""
-
try:
if os.path.isfile(_pid_file_name):
os.remove(_pid_file_name)
@@ -84,6 +79,6 @@ def rm_pid(_pid_file_name):
else:
return False
- except IOError:
+ except:
print("Error removing Runtime PID file: %s" % _pid_file_name)
return False
diff --git a/snmptrap/mod/trapd_settings.py b/snmptrap/mod/trapd_settings.py
index 26e7c9b..33cf56b 100644
--- a/snmptrap/mod/trapd_settings.py
+++ b/snmptrap/mod/trapd_settings.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -114,7 +114,7 @@ def init():
global json_traps_filename
json_traps_filename = ""
global json_traps_fd
- json_fd = None
+ json_traps_fd = None
# </json log of traps published>
# <log of arriving traps >
diff --git a/snmptrap/mod/trapd_snmpv3.py b/snmptrap/mod/trapd_snmpv3.py
index 56ecb5c..1a1cc0c 100644
--- a/snmptrap/mod/trapd_snmpv3.py
+++ b/snmptrap/mod/trapd_snmpv3.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -82,8 +82,6 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config):
# find options at -> site-packages/pysnmp/entity/config.py
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
- # print("Checking auth for %s" % (userName))
-
# usmHMACMD5AuthProtocol
try:
authKey = v3_user["usmHMACMD5AuthProtocol"]
@@ -123,8 +121,6 @@ def load_snmpv3_credentials(_py_config, _snmp_engine, _cbs_config):
# find options at -> site-packages/pysnmp/entity/config.py
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
- # print("Checking priv for %s" % (userName))
-
# usm3DESEDEPriv
try:
privKey = v3_user["usm3DESEDEPrivProtocol"]
diff --git a/snmptrap/mod/trapd_stormwatch.py b/snmptrap/mod/trapd_stormwatch.py
index 4c374bb..d0ff5a7 100644
--- a/snmptrap/mod/trapd_stormwatch.py
+++ b/snmptrap/mod/trapd_stormwatch.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -65,8 +65,8 @@ def sw_init():
# # # # # # # # # # # # #
-# fx: sw_storm_active
-# - check if storm is active for agent/oid
+# fx: sw_clear_dicts
+# - clear out all dictionaries in stats
# - returns True if yes, False if no
# # # # # # # # # # # # #
def sw_clear_dicts():
@@ -97,7 +97,9 @@ def sw_clear_dicts():
if hasattr(sws, "sw_config_category"):
sws.sw_config_category.clear()
return True
+
except Exception as e:
+ print(f">>>> got exception {e}")
msg = "unable to reset stormwatch dictionaries - results will be indeterminate: %s" % (e)
ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg)
return False
@@ -123,7 +125,8 @@ def sw_load_trap_config(_config):
ret = sw_clear_dicts()
msg = "reset existing sws dictionaries to empty"
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
- except NameError:
+
+ except (NameError, AttributeError):
msg = "sws dictionaries not present - initializing"
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
ret = sw_init()
@@ -138,6 +141,7 @@ def sw_load_trap_config(_config):
)
msg = "metric_log_notification_threshold_pct value: %d" % stats.metric_log_notification_threshold_pct
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+
except Exception as e:
msg = "metric_log_notification_threshold_pct not present in config - default to 25"
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg)
@@ -148,6 +152,7 @@ def sw_load_trap_config(_config):
sws.sw_interval_in_seconds = int(_config["trap_config"]["sw_interval_in_seconds"])
msg = "sw_interval_in_seconds value: %d" % sws.sw_interval_in_seconds
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+
except Exception as e:
msg = "sw_interval_in_seconds not present in config - default to 60 seconds"
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg)
@@ -156,16 +161,18 @@ def sw_load_trap_config(_config):
# add trap configs from CBS json structure to running config
try:
notify_oids = _config["trap_config"]["notify_oids"]
+
except Exception as e:
msg = "no trap_config or notify_oids defined"
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg)
- return False
+ return 0
trap_block_counter = 0
for trap_block in notify_oids:
# oid
try:
_oid = trap_block["oid"]
+
except Exception as e:
msg = "missing oid value in notify_oids - oid section of CBS config - using empty value, disregard entry"
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg)
@@ -174,6 +181,7 @@ def sw_load_trap_config(_config):
# sw_high_water_in_interval
try:
_sw_high_water_in_interval = int(trap_block["sw_high_water_in_interval"])
+
except Exception as e:
msg = (
"missing sw_high_water_in_interval value in notify_oids - oid section of CBS config - using empty value"
@@ -184,6 +192,7 @@ def sw_load_trap_config(_config):
# sw_low_water_in_interval
try:
_sw_low_water_in_interval = int(trap_block["sw_low_water_in_interval"])
+
except Exception as e:
msg = (
"missing sw_low_water_in_interval value in notify_oids - oid section of CBS config - using empty value"
@@ -194,6 +203,7 @@ def sw_load_trap_config(_config):
# category
try:
_category = trap_block["category"]
+
except Exception as e:
msg = "missing category value in notify_oids - oid section of CBS config - using empty value"
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_WARN, tds.CODE_GENERAL, msg)
diff --git a/snmptrap/scheduler.sh b/snmptrap/scheduler.sh
index bb2d083..bf989b9 100755
--- a/snmptrap/scheduler.sh
+++ b/snmptrap/scheduler.sh
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -56,10 +56,10 @@ _verbosity=$2
_log_message=$3
echo "`date` `date +%s` ${_fx} ${_error_code} ${_log_message}" >> ${log_fd}
-
+
# log_lines=`wc -l ${log_fd} | awk {'print $1'} | bc`
# log_lines=$((${log_lines}+1))
-
+
# if [ ${log_lines} -ge ${max_log_lines} ]
# then
# if [ -f ${log_fd} ]
@@ -178,9 +178,9 @@ do
run_daily_jobs
# reset last_day
last_day=${current_day}
- fi
- fi
- fi
+ fi
+ fi
+ fi
fi
sleep ${sleep_time}
done
diff --git a/snmptrap/snmptrapd.py b/snmptrap/snmptrapd.py
index ddb6b32..d8d86e5 100644
--- a/snmptrap/snmptrapd.py
+++ b/snmptrap/snmptrapd.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -167,6 +167,7 @@ def load_all_configs(_signum, _frame):
def resolve_ip(_loc_ip_addr_str):
try:
+
if int(tds.dns_cache_ip_expires[_loc_ip_addr_str] < int(time.time())):
raise Exception(
"cache expired for %s at %d - updating value"
@@ -515,20 +516,21 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx):
# tds.trap_dict["notify OID"] = "." + str(enterprise) + ".0." + str(specific_trap)
# tds.trap_dict["notify OID len"] = tds.trap_dict["notify OID"].count('.')
# tds.trap_dict["sysUptime"] = variables['pdu']['time-stamp'].prettyPrint()
+
+ elif snmp_version == 2:
+ tds.trap_dict["protocol version"] = "v2c"
+
+ elif snmp_version == 3:
+ tds.trap_dict["protocol version"] = "v3"
+ # tds.trap_dict["security level"] = str(variables['securityLevel'])
+ # tds.trap_dict["context name"] = str(
+ # variables['contextName'].prettyPrint())
+ # tds.trap_dict["security name"] = str(variables['securityName'])
+ # tds.trap_dict["security engine"] = str(
+ # variables['contextEngineId'].prettyPrint())
+
else:
- if snmp_version == 2:
- tds.trap_dict["protocol version"] = "v2c"
- else:
- if snmp_version == 3:
- tds.trap_dict["protocol version"] = "v3"
- # tds.trap_dict["security level"] = str(variables['securityLevel'])
- # tds.trap_dict["context name"] = str(
- # variables['contextName'].prettyPrint())
- # tds.trap_dict["security name"] = str(variables['securityName'])
- # tds.trap_dict["security engine"] = str(
- # variables['contextEngineId'].prettyPrint())
- else:
- tds.trap_dict["protocol version"] = "unknown"
+ tds.trap_dict["protocol version"] = "unknown"
# tds.trap_dict['time received'] = epoch_msecond
tds.trap_dict["time received"] = epoch_second
@@ -672,15 +674,6 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName,
msg = "processing varbinds for %s" % (tds.trap_dict["uuid"])
ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
- # help(snmp_engine)
- # print(snmp_engine)
- # help(varBinds)
- # print(varBinds)
- # help(cbCtx)
- # print(cbCtx)
- # for key, val in cbCtx:
- # print(key, val)
-
# FMDL update reset location when batching publishes
pdu_varbind_count = 0
payload_varbinds = 0
@@ -735,6 +728,7 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName,
if tds.first_trap:
tds.all_traps_json_str = curr_trap_json_str
tds.trap_uuids_in_buffer = tds.trap_dict["uuid"]
+
tds.first_trap = False
else:
tds.trap_uuids_in_buffer = tds.trap_uuids_in_buffer + ", " + tds.trap_dict["uuid"]
diff --git a/snmptrap/snmptrapd.sh b/snmptrap/snmptrapd.sh
index 717f660..6755c0c 100755
--- a/snmptrap/snmptrapd.sh
+++ b/snmptrap/snmptrapd.sh
@@ -2,7 +2,7 @@
# -*- indent-tabs-mode: nil -*- # vi: set expandtab:
#
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -66,7 +66,7 @@ fi
export PYTHONPATH=${bin_base_dir}/mod:$PYTHONPATH
# PYTHONUNBUFFERED:
-# set PYTHONUNBUFFERED to a non-empty string to avoid output buffering;
+# set PYTHONUNBUFFERED to a non-empty string to avoid output buffering;
# comment out for runtime environments/better performance!
# export PYTHONUNBUFFERED="True"
@@ -77,14 +77,14 @@ export CBS_SIM_JSON=${base_dir}/etc/snmptrapd.json
# misc
exit_after=1
-# # # # # # # # # #
+# # # # # # # # # #
# log_msg - log messages to stdout in standard manner
-# # # # # # # # # #
+# # # # # # # # # #
log_msg()
{
msg=$*
- echo "`date +%Y-%m-%dT%H:%M:%S,%N | cut -c1-23` ${msg}"
+ echo "`date +%Y-%m-%dT%H:%M:%S,%N | cut -c1-23` ${msg}"
}
#
@@ -96,7 +96,7 @@ process_name=$1
pid_file=$2
exec_cmd=$3
- # check if exec_cmd has a pid_file
+ # check if exec_cmd has a pid_file
if [ ! -r ${pid_file} ]
then
log_msg "Starting ${process_name}"
@@ -123,13 +123,13 @@ exec_cmd=$3
fi
}
-# # # # # # # # # #
+# # # # # # # # # #
# Start the service
-# # # # # # # # # #
+# # # # # # # # # #
start_service()
{
# Hints for startup modifications:
- # _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+ # _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
# handy for debug (e.g. docker logs output)
# log_msg "Runtime env present for ${current_module} placed in ${base_dir}/logs/${current_module}.out"
@@ -165,9 +165,9 @@ start_service()
fi
}
-# # # # # # # # # #
+# # # # # # # # # #
# Stop the service
-# # # # # # # # # #
+# # # # # # # # # #
stop_process()
{
process_name=$1
@@ -337,7 +337,7 @@ case "$1" in
status_service
wait
;;
- "stop")
+ "stop")
version
stop_service
;;
@@ -348,7 +348,7 @@ case "$1" in
start_service
status_service
;;
- "status")
+ "status")
version
status_service
;;
diff --git a/tests/test_snmptrapd.py b/tests/test_snmptrapd.py
index dee2aa0..736e114 100644
--- a/tests/test_snmptrapd.py
+++ b/tests/test_snmptrapd.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,11 +14,17 @@
# limitations under the License.
# ============LICENSE_END=========================================================
+import copy
+import datetime
import os
-import pytest
import unittest
+from pathlib import Path
+import time
+from unittest.mock import patch, Mock
+
+import requests
+
import snmptrapd
-import datetime
import trapd_settings as tds
import trapd_stormwatch_settings as sws
@@ -37,40 +43,207 @@ class test_snmptrapd(unittest.TestCase):
Test the save_pid mod
"""
- pytest_json_data = '{ "snmptrapd": { "version": "2.0.3", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }'
+ class WriteThrows():
+ def write():
+ raise RuntimeError("write() throws")
+
+
+ @classmethod
+ def setUpClass(cls):
+
+ # init vars
+ tds.init()
+ sw.sw_init()
+
+ # fmt: off
+ test_snmptrapd.pytest_empty_data = "{}"
+ test_snmptrapd.pytest_json_data = (
+ '{ "snmptrapd": { '
+ ' "version": "2.0.3", '
+ ' "title": "ONAP SNMP Trap Receiver" }, '
+ ' "protocols": { '
+ ' "transport": "udp", '
+ ' "ipv4_interface": "0.0.0.0", '
+ ' "ipv4_port": 6162, '
+ ' "ipv6_interface": "::1", '
+ ' "ipv6_port": 6162 }, '
+ ' "cache": { '
+ ' "dns_cache_ttl_seconds": 60 }, '
+ ' "publisher": { '
+ ' "http_timeout_milliseconds": 1500, '
+ ' "http_retries": 3, '
+ ' "http_milliseconds_between_retries": 750, '
+ ' "http_primary_publisher": "true", '
+ ' "http_peer_publisher": "unavailable", '
+ ' "max_traps_between_publishes": 10, '
+ ' "max_milliseconds_between_publishes": 10000 }, '
+ ' "streams_publishes": { '
+ ' "sec_fault_unsecure": { '
+ ' "type": "message_router", '
+ ' "aaf_password": null, '
+ ' "dmaap_info": { '
+ ' "location": "mtl5", '
+ ' "client_id": null, '
+ ' "client_role": null, '
+ ' "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, '
+ ' "aaf_username": null } }, '
+ ' "files": { '
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap", '
+ ' "log_dir": "logs", '
+ ' "data_dir": "data", '
+ ' "pid_dir": "tmp", '
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log", '
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log", '
+ ' "traps_stats_log": "snmptrapd_stats.csv", '
+ ' "perm_status_file": "snmptrapd_status.log", '
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", '
+ ' "eelf_error": "error.log", '
+ ' "eelf_debug": "debug.log", '
+ ' "eelf_audit": "audit.log", '
+ ' "eelf_metrics": "metrics.log", '
+ ' "roll_frequency": "day", '
+ ' "minimum_severity_to_log": 2 }, '
+ ' "trap_config": { '
+ ' "sw_interval_in_seconds": 60, '
+ ' "notify_oids": { '
+ ' ".1.3.6.1.4.1.9.0.1": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.2": { '
+ ' "sw_high_water_in_interval": 101, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.3": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.4": { '
+ ' "sw_high_water_in_interval": 10, '
+ ' "sw_low_water_in_interval": 3, '
+ ' "category": "logonly" } } }, '
+ ' "snmpv3_config": { '
+ ' "usm_users": [ { '
+ ' "user": "usr-sha-aes256", '
+ ' "engineId": "8000000001020304", '
+ ' "usmHMACSHAAuth": "authkey1", '
+ ' "usmAesCfb256": "privkey1" }, '
+ ' { "user": "user1", '
+ ' "engineId": "8000000000000001", '
+ ' "usmHMACMD5Auth": "authkey1", '
+ ' "usmDESPriv": "privkey1" }, '
+ ' { "user": "user2", '
+ ' "engineId": "8000000000000002", '
+ ' "usmHMACSHAAuth": "authkey2", '
+ ' "usmAesCfb128": "privkey2" }, '
+ ' { "user": "user3", '
+ ' "engineId": "8000000000000003", '
+ ' "usmHMACSHAAuth": "authkey3", '
+ ' "usmAesCfb256": "privkey3" } '
+ ' ] } }'
+ )
+ # fmt: off
+
+ test_snmptrapd.trap_dict_info = {
+ "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
+ "agent address": "1.2.3.4",
+ "agent name": "test-agent.nodomain.com",
+ "cambria.partition": "test-agent.nodomain.com",
+ "community": "",
+ "community len": 0,
+ "epoch_serno": 15222068260000,
+ "protocol version": "v2c",
+ "time received": 1522206826.2938566,
+ "trap category": "ONAP-COLLECTOR-SNMPTRAP",
+ "sysUptime": "218567736",
+ "notify OID": "1.3.6.1.4.1.9999.9.9.999",
+ "notify OID len": 10,
+ }
+
+ snmptrap_dir = "/tmp/opt/app/snmptrap"
+ try:
+ Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True)
+ except Exception as e:
+ print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+ sys.exit(1)
+
+ # create copy of snmptrapd.json for pytest
+ test_snmptrapd.pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
+ with open(test_snmptrapd.pytest_json_config, "w") as outfile:
+ outfile.write(test_snmptrapd.pytest_json_data)
+
+ test_snmptrapd.pytest_empty_config = "/tmp/opt/app/snmptrap/etc/empty.json"
+ with open(test_snmptrapd.pytest_empty_config, "w") as outfile:
+ outfile.write(test_snmptrapd.pytest_empty_data)
- # create copy of snmptrapd.json for pytest
- pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
- with open(pytest_json_config, "w") as outfile:
- outfile.write(pytest_json_data)
def test_usage_err(self):
"""
Test usage error
"""
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
- result = snmptrapd.usage_err()
- assert pytest_wrapped_sys_exit.type == SystemExit
- assert pytest_wrapped_sys_exit.value.code == 1
+ with self.assertRaises(SystemExit) as exc:
+ snmptrapd.usage_err()
+ self.assertEqual(str(exc.exception), "1")
+
def test_load_all_configs(self):
"""
Test load of all configs
"""
+ # request load of CBS data
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
- # init vars
- tds.init()
- sw.sw_init()
+ result = trapd_get_cbs_config.get_cbs_config()
+ self.assertEqual(result, True)
- # request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
- self.assertEqual(result, True)
+ # request load of CBS data
+ self.assertEqual(snmptrapd.load_all_configs(0, 1), True)
+
+
+ def test_resolve_ip(self):
+ """ Test resolve_ip """
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ time_base = 1000000
+ time_offset = 10000
+ with patch('time.time', return_value=time_base):
+ self.assertEqual(time.time(), time_base)
+
+ fqdn = "foo.example"
+ ip = "1.2.3.4"
+ with patch.dict(tds.dns_cache_ip_to_name):
+ tds.dns_cache_ip_to_name = { }
+ # DOUBLE EXCEPTION - nothing in tds.dns_cache_ip_expires
+ # and gethostbyaddr() fails
+ with patch('socket.gethostbyaddr', side_effect=RuntimeError("gethostbyaddr raises")):
+ self.assertEqual(snmptrapd.resolve_ip(ip), ip)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], ip)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+ tds.dns_cache_ip_to_name = { ip: fqdn }
+ with patch('socket.gethostbyaddr', return_value=(fqdn,fqdn,[ip])):
+ # EXCEPTION - nothing in tds.dns_cache_ip_expires
+ del tds.dns_cache_ip_expires[ip]
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+ with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() - 10000}):
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+
+ with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() + 10000}):
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + time_offset)
- # request load of CBS data
- result = snmptrapd.load_all_configs(0, 1)
- self.assertEqual(result, True)
def test_load_all_configs_signal(self):
"""
@@ -81,78 +254,101 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
- self.assertEqual(result, True)
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ self.assertTrue(trapd_get_cbs_config.get_cbs_config())
+
+ # request load of CBS data
+ self.assertTrue(snmptrapd.load_all_configs(1, 1))
+
+ with patch('snmptrapd.get_cbs_config', return_value=False):
+ with self.assertRaises(SystemExit):
+ snmptrapd.load_all_configs(1, 1)
- # request load of CBS data
- result = snmptrapd.load_all_configs(1, 1)
- self.assertEqual(result, True)
def test_log_all_arriving_traps(self):
"""
Test logging of traps
"""
-
# init vars
tds.init()
- # request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
-
- # set last day to current
- tds.last_day = datetime.datetime.now().day
-
- # trap dict for logging
- tds.trap_dict = {
- "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
- "agent address": "1.2.3.4",
- "agent name": "test-agent.nodomain.com",
- "cambria.partition": "test-agent.nodomain.com",
- "community": "",
- "community len": 0,
- "epoch_serno": 15222068260000,
- "protocol version": "v2c",
- "time received": 1522206826.2938566,
- "trap category": "ONAP-COLLECTOR-SNMPTRAP",
- "sysUptime": "218567736",
- "notify OID": "1.3.6.1.4.1.9999.9.9.999",
- "notify OID len": 10,
- }
+ # don't open files, but try to log - should raise exception
+ with self.assertRaises(Exception) as exc:
+ snmptrapd.log_all_arriving_traps()
+ self.assertIsInstance(exc.exception, TypeError)
- # open eelf logs
- trapd_io.open_eelf_logs()
-
- # open trap logs
- tds.arriving_traps_filename = (
- tds.c_config["files"]["runtime_base_dir"]
- + "/"
- + tds.c_config["files"]["log_dir"]
- + "/"
- + (tds.c_config["files"]["arriving_traps_log"])
- )
- tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename)
-
- # name and open json trap log
- tds.json_traps_filename = (
- tds.c_config["files"]["runtime_base_dir"]
- + "/"
- + tds.c_config["files"]["log_dir"]
- + "/"
- + "DMAAP_"
- + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1])
- + ".json"
- )
- tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename)
- msg = "published traps logged to: %s" % tds.json_traps_filename
- trapd_io.stdout_logger(msg)
- trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+ # request load of CBS data
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ # trap dict for logging
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ result = trapd_get_cbs_config.get_cbs_config()
+
+ # set last day to current
+ tds.last_day = datetime.datetime.now().day
+
+
+ # open eelf logs
+ trapd_io.open_eelf_logs()
+
+ # open trap logs
+ tds.arriving_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"]
+ + "/"
+ + tds.c_config["files"]["log_dir"]
+ + "/"
+ + (tds.c_config["files"]["arriving_traps_log"])
+ )
+ tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename)
+
+ # name and open json trap log
+ tds.json_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"]
+ + "/"
+ + tds.c_config["files"]["log_dir"]
+ + "/"
+ + "DMAAP_"
+ + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1])
+ + ".json"
+ )
+ tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename)
+ msg = "published traps logged to: %s" % tds.json_traps_filename
+ trapd_io.stdout_logger(msg)
+ trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+ # also force it to daily roll
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, but with day rolling
+ tds.last_day = datetime.datetime.now().day - 1
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with roll_frequency set to minute
+ tds.last_minute = datetime.datetime.now().minute - 1
+ tds.c_config["files"]["roll_frequency"] = "minute"
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with roll_frequency set to hour
+ tds.last_hour = datetime.datetime.now().hour - 1
+ tds.c_config["files"]["roll_frequency"] = "hour"
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with a bad trap_dict[time_received]
+ tds.trap_dict["time received"] = "bad_value"
+ snmptrapd.log_all_arriving_traps()
+
+ # also test log_published_messages()
+ snmptrapd.log_published_messages("data #1")
+
+ # work even if there is an exception
+ # this SHOULD be done with a context
+ sv_json_traps_fd = tds.json_traps_fd
+ tds.json_traps_fd = test_snmptrapd.WriteThrows()
+ snmptrapd.log_published_messages("data #2")
+ tds.json_traps_fd = sv_json_traps_fd
- # don't open files, but try to log - should raise exception
- with pytest.raises(Exception) as pytest_wrapped_exception:
- result = snmptrapd.log_all_arriving_traps()
- assert pytest_wrapped_exception.type == AttributeError
def test_log_all_incorrect_log_type(self):
"""
@@ -163,11 +359,14 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- trapd_get_cbs_config.get_cbs_config()
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ trapd_get_cbs_config.get_cbs_config()
+
+ # open eelf logs
+ trapd_io.open_eelf_logs()
- # open eelf logs
- trapd_io.open_eelf_logs()
def test_v1_trap_receipt(self):
"""
@@ -178,26 +377,227 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- trapd_get_cbs_config.get_cbs_config()
-
- errorIndication, errorStatus, errorIndex, varbinds = next(
- sendNotification(
- SnmpEngine(),
- CommunityData("not_public"),
- UdpTransportTarget(("localhost", 6162)),
- ContextData(),
- "trap",
- [
- ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")),
- ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")),
- ],
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ trapd_get_cbs_config.get_cbs_config()
+
+ errorIndication, errorStatus, errorIndex, varbinds = next(
+ sendNotification(
+ SnmpEngine(),
+ CommunityData("not_public"),
+ UdpTransportTarget(("localhost", 6162)),
+ ContextData(),
+ "trap",
+ [
+ ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")),
+ ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")),
+ ],
+ )
)
- )
- result = errorIndication
- self.assertEqual(result, None)
+ result = errorIndication
+ self.assertEqual(result, None)
+
+
+ def test_post_dmaap(self):
+ """
+ Test post_dmaap()
+ """
+
+ # trap dict for logging
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ with patch('snmptrapd.ecomp_logger') as magic_ecomp_logger:
+ with patch('requests.Session.post') as magic_session_post:
+ fake_post_resp = Mock()
+ magic_session_post.return_value = fake_post_resp
+
+ fake_post_resp.status_code = requests.codes.moved_permanently
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 11)
+
+ fake_post_resp.status_code = requests.codes.ok
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 16)
+
+ magic_ecomp_logger.call_count = 0
+ tds.traps_since_last_publish = 1
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 5)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] = "aaf_username"
+ tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_password"] = "aaf_password"
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 5)
+
+ # for some reason this exception is being seen as an OSError ????
+ magic_ecomp_logger.call_count = 0
+ magic_session_post.side_effect = requests.exceptions.RequestException("test throw")
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 10)
+
+ magic_ecomp_logger.call_count = 0
+ magic_session_post.side_effect = OSError()
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 10)
+
+
+ @unittest.skip("do not know what to pass in for vars. Need an object with a clone() method")
+ def test_comm_string_rewrite_observer(self):
+ """
+ test comm_string_rewrite_observer()
+ """
+ vars = { "communityName": ["name"] }
+ snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx")
+ assertEqual(vars["communitName"], "public")
+
+ vars = { "communityName": [] }
+ snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx")
+ assertEqual(vars["communitName"], "")
+
+
+ def test_snmp_engine_observer_cb(self):
+ """
+ test snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx):
+ """
+
+ snmp_engine = "snmp_engine"
+ execpoint = "execpoint"
+ cbCtx = "cbCtx"
+ variables = {
+ 'transportDomain': [ 1, 2, 3 ],
+ 'transportAddress': [ "a", "b", "c" ]
+ }
+ for secmodel,ret in [ (1, "v1"), (2, "v2c"), (3, "v3"), (4, "unknown") ]:
+ variables["securityModel"] = secmodel
+ snmptrapd.snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx)
+ self.assertEqual(tds.trap_dict["protocol version"], ret)
-if __name__ == "__main__":
+ def test_add_varbind_to_log_string(self):
+ """
+ test add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val)
+ """
+ vb_oid = "vb_oid"
+ vb_type = "vb_type"
+
+ class TempPP():
+ def prettyPrint(self):
+ return "pp ret"
+
+ vb_val = TempPP()
+
+ self.assertEqual(tds.all_vb_str, "")
+
+ snmptrapd.add_varbind_to_log_string(0, vb_oid, vb_type, vb_val)
+ self.assertEqual(tds.all_vb_str,
+ 'varbinds: [0] vb_oid {vb_type} pp ret')
+
+ snmptrapd.add_varbind_to_log_string(1, vb_oid, vb_type, vb_val)
+ self.assertEqual(tds.all_vb_str,
+ 'varbinds: [0] vb_oid {vb_type} pp ret [1] vb_oid {vb_type} pp ret')
+
+
+ def test_add_varbind_to_json(self):
+ """
+ test add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val)
+ """
+
+ class TempPP():
+ def __init__(self, ret):
+ self.ret = ret
+ def prettyPrint(self):
+ return self.ret
+
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ vb_oid = TempPP("1.2.3")
+ override_vb_oid = TempPP("1.3.6.1.6.3.18.1.3.0")
+
+ vb_type = "vb_type"
+
+ vb_val = TempPP("foo.example")
+
+ self.assertEqual(snmptrapd.add_varbind_to_json(0, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(snmptrapd.add_varbind_to_json(1, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(tds.trap_dict["notify OID"], ".foo.example")
+ self.assertEqual(tds.trap_dict["notify OID len"], 2)
+
+ with patch('snmptrapd.resolve_ip') as magic_resolve_ip:
+ magic_resolve_ip.return_value = 'foo.example'
+ self.assertEqual(snmptrapd.add_varbind_to_json(2, override_vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(tds.trap_dict["agent address"], "foo.example")
+ self.assertEqual(tds.trap_dict["agent name"], "foo.example")
+
+ sv_protocol_version = tds.trap_dict["protocol version"]
+ tds.trap_dict["protocol version"] = "v1"
+ self.assertEqual(snmptrapd.add_varbind_to_json(4, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(snmptrapd.add_varbind_to_json(5, vb_oid, vb_type, vb_val), 1)
+
+ tds.trap_dict["protocol version"] = sv_protocol_version
+ self.assertEqual(snmptrapd.add_varbind_to_json(6, vb_oid, vb_type, vb_val), 1)
+ self.assertEqual(tds.all_vb_json_str,
+ ', "varbinds": [{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"}')
+
+ self.assertEqual(snmptrapd.add_varbind_to_json(7, vb_oid, vb_type, vb_val), 1)
+ self.assertEqual(tds.all_vb_json_str,
+ ', "varbinds": [{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"}')
+
+
+ @patch('snmptrapd.log_all_arriving_traps')
+ @patch('snmptrapd.post_dmaap', return_value = 0)
+ @patch('snmptrapd.ecomp_logger', return_value = 0)
+ @patch('snmptrapd.add_varbind_to_json', return_value = 1)
+ @patch('snmptrapd.add_varbind_to_log_string', return_value = 0)
+ def test_notif_receiver_cb(self, magic_add_varbind_to_log_string, magic_add_varbind_to_json,
+ magic_ecomp_logger, magic_port_dmaap, magic_lost_all_arriving_traps):
+ """ notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, varBinds, cbCtx) """
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=True):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertFalse(tds.first_trap)
+ self.assertEqual(magic_ecomp_logger.call_count, 8)
+
+ magic_ecomp_logger.call_count = 0
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["publisher"]["max_traps_between_publishes"] = 1
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["publisher"]["max_traps_between_publishes"] = 100
+ tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 0
+ tds.last_pub_time = 0
+ with patch('time.time', return_value=0):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.last_pub_time = 100000
+ tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 1
+ with patch('time.time', return_value=10):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py
index 0e38461..371b308 100644
--- a/tests/test_trapd_exit.py
+++ b/tests/test_trapd_exit.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
@@ -29,27 +28,26 @@ class test_cleanup_and_exit(unittest.TestCase):
def test_normal_exit(self):
"""
- Test normal exit works as expected
+ Test normal exit works as expected, and exits with the 1st arg
"""
- open(pid_file, "w")
+ # create an empty pid file
+ with open(pid_file, "w"):
+ pass
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
+ with self.assertRaises(SystemExit) as exc:
result = trapd_exit.cleanup_and_exit(0, pid_file)
- assert pytest_wrapped_sys_exit.type == SystemExit
- assert pytest_wrapped_sys_exit.value.code == 0
+ self.assertEqual(str(exc.exception), "0")
- # compare = str(result).startswith("SystemExit: 0")
- # self.assertEqual(compare, True)
def test_abnormal_exit(self):
"""
- Test exit with missing PID file exits non-zero
+ Test exit with missing PID file. Still exits with the 1st arg.
"""
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
+
+ with self.assertRaises(SystemExit) as exc:
result = trapd_exit.cleanup_and_exit(0, pid_file_dne)
- assert pytest_wrapped_sys_exit.type == SystemExit
- assert pytest_wrapped_sys_exit.value.code == 1
+ self.assertEqual(str(exc.exception), "0")
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py
index b7c5d7b..4064b78 100644
--- a/tests/test_trapd_get_cbs_config.py
+++ b/tests/test_trapd_get_cbs_config.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,101 +14,198 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
-import unittest
+import json
import os
import sys
+import unittest
+from unittest.mock import patch
+from pathlib import Path
-from onap_dcae_cbs_docker_client.client import get_config
-from trapd_exit import cleanup_and_exit
-from trapd_io import stdout_logger, ecomp_logger
-import trapd_settings as tds
import trapd_get_cbs_config
-from pathlib import Path
-# # # # # # # #
-# ENV setup
-# # # # # # # #
-
-# required directory tree
-try:
- Path("/tmp/opt/app/snmptrap/logs").mkdir(parents=True, exist_ok=True)
- Path("/tmp/opt/app/snmptrap/tmp").mkdir(parents=True, exist_ok=True)
-except Exception as e:
- print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
- sys.exit(1)
-
-# env var for CBS_SIM_JSON
-try:
- os.environ["CBS_SIM_JSON"] = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
-except Exception as e:
- print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
- sys.exit(1)
-
-pytest_json_data = '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }'
-
-# create snmptrapd.json for pytest
-pytest_json_config = os.getenv("CBS_SIM_JSON")
-with open(pytest_json_config, "w") as outfile:
- outfile.write(pytest_json_data)
-outfile.close()
-
-# test class/methods
-class test_get_cbs_config(unittest.TestCase):
+class test_trapd_get_cbs_config(unittest.TestCase):
"""
Test the trapd_get_cbs_config mod
"""
+ snmptrap_dir = "/tmp/opt/app/snmptrap"
+ json_dir = snmptrap_dir + "/etc"
+
+ # fmt: off
+ pytest_json_data = json.loads(
+ '{'
+ '"snmptrapd": { '
+ ' "version": "1.4.0", '
+ ' "title": "ONAP SNMP Trap Receiver" }, '
+ '"protocols": { '
+ ' "transport": "udp", '
+ ' "ipv4_interface": "0.0.0.0", '
+ ' "ipv4_port": 6162, '
+ ' "ipv6_interface": "::1", '
+ ' "ipv6_port": 6162 }, '
+ '"cache": { '
+ ' "dns_cache_ttl_seconds": 60 }, '
+ '"publisher": { '
+ ' "http_timeout_milliseconds": 1500, '
+ ' "http_retries": 3, '
+ ' "http_milliseconds_between_retries": 750, '
+ ' "http_primary_publisher": "true", '
+ ' "http_peer_publisher": "unavailable", '
+ ' "max_traps_between_publishes": 10, '
+ ' "max_milliseconds_between_publishes": 10000 }, '
+ '"streams_publishes": { '
+ ' "sec_fault_unsecure": { '
+ ' "type": "message_router", '
+ ' "aaf_password": null, '
+ ' "dmaap_info": { '
+ ' "location": "mtl5", '
+ ' "client_id": null, '
+ ' "client_role": null, '
+ ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, '
+ ' "aaf_username": null } }, '
+ '"files": { '
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap", '
+ ' "log_dir": "logs", '
+ ' "data_dir": "data", '
+ ' "pid_dir": "tmp", '
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log", '
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log", '
+ ' "traps_stats_log": "snmptrapd_stats.csv", '
+ ' "perm_status_file": "snmptrapd_status.log", '
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", '
+ ' "eelf_error": "error.log", '
+ ' "eelf_debug": "debug.log", '
+ ' "eelf_audit": "audit.log", '
+ ' "eelf_metrics": "metrics.log", '
+ ' "roll_frequency": "day", '
+ ' "minimum_severity_to_log": 2 }, '
+ '"trap_config": { '
+ ' "sw_interval_in_seconds": 60, '
+ ' "notify_oids": { '
+ ' ".1.3.6.1.4.1.9.0.1": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.2": { '
+ ' "sw_high_water_in_interval": 101, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.3": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.4": { '
+ ' "sw_high_water_in_interval": 10, '
+ ' "sw_low_water_in_interval": 3, '
+ ' "category": "logonly" } } }, '
+ '"snmpv3_config": { '
+ ' "usm_users": [ { '
+ ' "user": "usr-sha-aes256", '
+ ' "engineId": "8000000001020304", '
+ ' "usmHMACSHAAuth": "authkey1", '
+ ' "usmAesCfb256": "privkey1" }, '
+ ' { "user": "user1", '
+ ' "engineId": "8000000000000001", '
+ ' "usmHMACMD5Auth": "authkey1", '
+ ' "usmDESPriv": "privkey1" }, '
+ ' { "user": "user2", '
+ ' "engineId": "8000000000000002", '
+ ' "usmHMACSHAAuth": "authkey2", '
+ ' "usmAesCfb128": "privkey2" }, '
+ ' { "user": "user3", '
+ ' "engineId": "8000000000000003", '
+ ' "usmHMACSHAAuth": "authkey3", '
+ ' "usmAesCfb256": "privkey3" } '
+ '] } }'
+ )
+ # fmt: on
+
+
+ @classmethod
+ def setUpClass(cls):
+ """ set up the required directory tree """
+ try:
+ Path(test_trapd_get_cbs_config.snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True)
+ Path(test_trapd_get_cbs_config.snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True)
+ Path(test_trapd_get_cbs_config.snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True)
+ except Exception as e:
+ print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+ sys.exit(1)
+
+
+ def write_config(self, filename, config):
+ """
+ write a config file
+ """
+ # create snmptrapd.json for pytest
+ with open(filename, "w") as outfile:
+ json.dump(config, outfile)
+
+
+ @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/snmptrapd.json"})
def test_cbs_fallback_env_present(self):
"""
Test that CBS fallback env variable exists and we can get config
from fallback env var
"""
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
- print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, True)
- self.assertEqual(result, True)
+ assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/snmptrapd.json"
+ self.write_config(test_trapd_get_cbs_config.json_dir + "/snmptrapd.json", test_trapd_get_cbs_config.pytest_json_data)
+
+ self.assertTrue(trapd_get_cbs_config.get_cbs_config())
+
+ @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/snmptrapd.json"})
+ def test_cbs_fallback_env_present_bad_numbers(self):
+ """
+ Test as in test_cbs_fallback_env_present(), but with
+ various values reset to be non-numeric.
+ """
+ assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/snmptrapd.json"
+ with patch.dict(test_trapd_get_cbs_config.pytest_json_data):
+ test_trapd_get_cbs_config.pytest_json_data["publisher"]["http_milliseconds_between_retries"] = "notanumber"
+ test_trapd_get_cbs_config.pytest_json_data["files"]["minimum_severity_to_log"] = "notanumber"
+ test_trapd_get_cbs_config.pytest_json_data["publisher"]["http_retries"] = "notanumber"
+ self.write_config(test_trapd_get_cbs_config.json_dir + "/snmptrapd.json",
+ test_trapd_get_cbs_config.pytest_json_data)
+
+ self.assertTrue(trapd_get_cbs_config.get_cbs_config())
+
+
+ @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/nosuchfile.json"})
def test_cbs_override_env_invalid(self):
""" """
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/nosuchfile.json")
- # result = trapd_get_cbs_config.get_cbs_config()
- # print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, False)
+ assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/nosuchfile.json"
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
+ with self.assertRaises(SystemExit) as exc:
result = trapd_get_cbs_config.get_cbs_config()
- assert pytest_wrapped_sys_exit.type == SystemExit
- # assert pytest_wrapped_sys_exit.value.code == 1
+ self.assertEqual(str(exc.exception), "1")
+
+ @patch.dict(os.environ, {"CONSUL_HOST": "localhost"})
def test_cbs_env_present(self):
"""
Test that CONSUL_HOST env variable exists but fails to
respond
"""
- os.environ.update(CONSUL_HOST="localhost")
+ self.assertEqual(os.getenv("CONSUL_HOST"), "localhost")
+
del os.environ["CBS_SIM_JSON"]
- # result = trapd_get_cbs_config.get_cbs_config()
- # print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, False)
+ self.assertNotIn("CBS_SIM_JSON", os.environ)
- with pytest.raises(SystemExit) as sys_exit:
+ with self.assertRaises(SystemExit) as exc:
trapd_get_cbs_config.get_cbs_config()
- assert sys_exit.value.errno == errno.ECONNREFUSED
+
+ @patch.dict(os.environ, {})
def test_cbs_override_env_undefined(self):
""" """
- print("------>>> RUNNING test_no_cbs_override_env_var:")
del os.environ["CBS_SIM_JSON"]
+ self.assertNotIn("CBS_SIM_JSON", os.environ)
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
- assert trapd_get_cbs_config.get_cbs_config() == SystemExit
+ with self.assertRaises(SystemExit) as exc:
+ trapd_get_cbs_config.get_cbs_config()
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py
index 7ec89a6..667a454 100644
--- a/tests/test_trapd_http_session.py
+++ b/tests/test_trapd_http_session.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,9 +14,11 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_http_session
+import requests
+from unittest.mock import Mock, patch
+import trapd_settings as tds
class test_init_session_obj(unittest.TestCase):
@@ -24,39 +26,62 @@ class test_init_session_obj(unittest.TestCase):
Test the init_session_obj mod
"""
- def close_nonexisting_session(self):
+ @classmethod
+ def setUpClass(cls):
+ tds.init()
+
+ def test_init_session_obj(self):
"""
- test close of existing http session
+ test creation of http session
"""
- sess = "no session"
- result = trapd_http_session.close_session_obj(sess)
- self.assertEqual(result, True)
+ self.assertIsInstance(trapd_http_session.init_session_obj(), requests.sessions.Session)
+
- def init_session(self):
+ def test_init_session_obj_raises(self):
"""
- test creation of http session
+ test close when the requests.Session() method throws an exception
+ """
+ with patch('requests.Session') as magic_requests:
+ magic_requests.side_effect = RuntimeError("Session() throws via mock")
+ with self.assertRaises(SystemExit) as exc:
+ trapd_http_session.init_session_obj()
+ self.assertEqual(str(exc.exception), "1")
+
+
+ def test_close_nonexisting_session(self):
"""
- result = trapd_http_session.init_session_obj()
- compare = str(result).startswith("<requests.sessions.Session object at")
- self.assertEqual(compare, True)
+ test close of non-existing http session
+ """
+ self.assertIsNone(trapd_http_session.close_session_obj(None))
+
+
+ def test_close_nonexisting_close_raises(self):
+ """
+ test close when the session.close() method throws
+ """
+ class CloseThrows():
+ def close(self):
+ raise RuntimeError("close() throws")
+
+ with self.assertRaises(SystemExit):
+ trapd_http_session.close_session_obj(CloseThrows())
+
def test_reset(self):
"""
test reset of existing http session
"""
sess = trapd_http_session.init_session_obj()
- result = trapd_http_session.reset_session_obj(sess)
- compare = str(result).startswith("<requests.sessions.Session object at")
- self.assertEqual(compare, True)
+ self.assertIsInstance(trapd_http_session.reset_session_obj(sess), requests.sessions.Session)
+
- def close_existing_session(self):
+ def test_close_existing_session(self):
"""
test close of existing http session
"""
sess = trapd_http_session.init_session_obj()
- result = trapd_http_session.close_session_obj(sess)
- self.assertEqual(result, True)
+ self.assertTrue(trapd_http_session.close_session_obj(sess))
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_io.py b/tests/test_trapd_io.py
index c1702aa..f3d1a4c 100644
--- a/tests/test_trapd_io.py
+++ b/tests/test_trapd_io.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,17 +14,21 @@
# limitations under the License.
# ============LICENSE_END=========================================================
+import datetime
+import glob
+import io
+import json
import os
-import pytest
+from pathlib import Path
+import sys
+import tempfile
import unittest
+from unittest.mock import patch
+
import snmptrapd
-import datetime
-import json
import trapd_settings as tds
import trapd_runtime_pid
import trapd_io
-import sys
-from pathlib import Path
class test_trapd_io(unittest.TestCase):
@@ -32,32 +36,141 @@ class test_trapd_io(unittest.TestCase):
Test the save_pid mod
"""
- tds.c_config = json.loads(
- '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }'
- )
+ class PseudoFile():
+ """ test file-like object that does nothing """
+ def write(self):
+ pass
+ def close(self):
+ pass
- def test_roll_all_files_notopen(self):
- """
- Test rolling files that aren't open
- """
+ class WriteThrows():
+ """ test file-like object that throws on a write """
+ def write(self):
+ raise RuntimeError("close() throws")
+
+
+ @classmethod
+ def setUpClass(cls):
+
+ tds.init()
+
+ snmptrap_dir = "/tmp/opt/app/snmptrap"
+ try:
+ Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True)
+ except Exception as e:
+ print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+ sys.exit(1)
+
+ # fmt: off
+ tds.c_config = json.loads(
+ '{ "snmptrapd": { '
+ ' "version": "1.4.0", '
+ ' "title": "ONAP SNMP Trap Receiver" }, '
+ '"protocols": { '
+ ' "transport": "udp", '
+ ' "ipv4_interface": "0.0.0.0", '
+ ' "ipv4_port": 6162, '
+ ' "ipv6_interface": "::1", '
+ ' "ipv6_port": 6162 }, '
+ '"cache": { '
+ ' "dns_cache_ttl_seconds": 60 }, '
+ '"publisher": { '
+ ' "http_timeout_milliseconds": 1500, '
+ ' "http_retries": 3, '
+ ' "http_milliseconds_between_retries": 750, '
+ ' "http_primary_publisher": "true", '
+ ' "http_peer_publisher": "unavailable", '
+ ' "max_traps_between_publishes": 10, '
+ ' "max_milliseconds_between_publishes": 10000 }, '
+ '"streams_publishes": { '
+ ' "sec_fault_unsecure": { '
+ ' "type": "message_router", '
+ ' "aaf_password": null, '
+ ' "dmaap_info": { '
+ ' "location": "mtl5", '
+ ' "client_id": null, '
+ ' "client_role": null, '
+ ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, '
+ ' "aaf_username": null } }, '
+ '"files": { '
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap", '
+ ' "log_dir": "logs", '
+ ' "data_dir": "data", '
+ ' "pid_dir": "tmp", '
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log", '
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log", '
+ ' "traps_stats_log": "snmptrapd_stats.csv", '
+ ' "perm_status_file": "snmptrapd_status.log", '
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", '
+ ' "eelf_error": "error.log", '
+ ' "eelf_debug": "debug.log", '
+ ' "eelf_audit": "audit.log", '
+ ' "eelf_metrics": "metrics.log", '
+ ' "roll_frequency": "day", '
+ ' "minimum_severity_to_log": 2 }, '
+ '"trap_config": { '
+ ' "sw_interval_in_seconds": 60, '
+ ' "notify_oids": { '
+ ' ".1.3.6.1.4.1.9.0.1": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.2": { '
+ ' "sw_high_water_in_interval": 101, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.3": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.4": { '
+ ' "sw_high_water_in_interval": 10, '
+ ' "sw_low_water_in_interval": 3, '
+ ' "category": "logonly" } } }, '
+ '"snmpv3_config": { '
+ ' "usm_users": [ { '
+ ' "user": "usr-sha-aes256", '
+ ' "engineId": "8000000001020304", '
+ ' "usmHMACSHAAuth": "authkey1", '
+ ' "usmAesCfb256": "privkey1" }, '
+ ' { "user": "user1", '
+ ' "engineId": "8000000000000001", '
+ ' "usmHMACMD5Auth": "authkey1", '
+ ' "usmDESPriv": "privkey1" }, '
+ ' { "user": "user2", '
+ ' "engineId": "8000000000000002", '
+ ' "usmHMACSHAAuth": "authkey2", '
+ ' "usmAesCfb128": "privkey2" }, '
+ ' { "user": "user3", '
+ ' "engineId": "8000000000000003", '
+ ' "usmHMACSHAAuth": "authkey3", '
+ ' "usmAesCfb256": "privkey3" } '
+ '] } }'
+ )
+ # fmt: on
+ tds.json_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"] + "/json_traps.json"
+ )
+ tds.arriving_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"] + "/arriving_traps.log"
+ )
- # try to roll logs when not open
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.roll_all_logs()
- assert pytest_wrapped_exception.type == SystemExit
def test_open_eelf_error_file(self):
"""
Test bad error file location
"""
- # open eelf error logs
- tds.c_config["files.eelf_error"] = "/bad_dir/error.log"
+ with patch.dict(tds.c_config["files"]):
+ # open eelf error logs
+ tds.c_config["files"]["eelf_error"] = "/bad_dir/error.log"
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
- # try to open file in non-existent dir
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_eelf_logs()
- assert pytest_wrapped_exception.type == SystemExit
def test_open_eelf_debug_file(self):
"""
@@ -65,86 +178,248 @@ class test_trapd_io(unittest.TestCase):
"""
# open eelf debug logs
- tds.c_config["files.eelf_debug"] = "/bad_dir/debug.log"
+ with patch.dict(tds.c_config["files"]):
+ tds.c_config["files"]["eelf_debug"] = "/bad_dir/debug.log"
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
- # try to open file in non-existent dir
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_eelf_logs()
- assert pytest_wrapped_exception.type == SystemExit
def test_open_eelf_audit_file(self):
"""
Test bad audit file location
"""
- # open eelf debug logs
- tds.c_config["files.eelf_audit"] = "/bad_dir/audit.log"
+ with patch.dict(tds.c_config["files"]):
+ # open eelf debug logs
+ tds.c_config["files"]["eelf_audit"] = "/bad_dir/audit.log"
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
- # try to open file in non-existent dir
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_eelf_logs()
- assert pytest_wrapped_exception.type == SystemExit
def test_open_eelf_metrics_file(self):
"""
Test bad metrics file location
"""
+ with patch.dict(tds.c_config["files"]):
+ # open eelf debug logs
+ tds.c_config["files"]["eelf_metrics"] = "/bad_dir/metrics.log"
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
+
+
+ def test_open_eelf_error_file_missing_name(self):
+ """
+ Test bad error file location
+ """
+
+ with patch.dict(tds.c_config["files"]):
+ # open eelf error logs
+ del tds.c_config["files"]["eelf_error"]
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
+
+
+ def test_open_eelf_debug_file_missing_name(self):
+ """
+ Test bad debug file location
+ """
+
# open eelf debug logs
- tds.c_config["files.eelf_metrics"] = "/bad_dir/metrics.log"
+ with patch.dict(tds.c_config["files"]):
+ del tds.c_config["files"]["eelf_debug"]
- # try to open file in non-existent dir
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_eelf_logs()
- assert pytest_wrapped_exception.type == SystemExit
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
- def test_roll_all_logs(self):
+
+ def test_open_eelf_audit_file_missing_name(self):
+ """
+ Test bad audit file location
+ """
+
+ with patch.dict(tds.c_config["files"]):
+ # open eelf debug logs
+ del tds.c_config["files"]["eelf_audit"]
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
+
+
+ def test_open_eelf_metrics_file_missing_name(self):
+ """
+ Test bad metrics file location
+ """
+
+ with patch.dict(tds.c_config["files"]):
+ # open eelf debug logs
+ del tds.c_config["files"]["eelf_metrics"]
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
+
+
+ def test_roll_all_logs_not_open(self):
"""
Test roll of logs when not open
"""
- # try to roll logs when not open
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.roll_all_logs()
- assert pytest_wrapped_exception.type == SystemExit
+ # try to roll logs when not open. Shouldn't fail
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+ def test_roll_all_logs(self):
+ """
+ Test rolling files that they are open
+ """
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+
+ def test_roll_all_logs_roll_file_throws(self):
+ """
+ Test rolling files that they are open
+ but roll_file throws an exception
+ """
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ with patch('trapd_io.roll_file') as roll_file_throws:
+ roll_file_throws.side_effect = RuntimeError("roll_file() throws")
+ with self.assertRaises(SystemExit):
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+
+ def test_roll_all_logs_open_eelf_logs_returns_false(self):
+ """
+ Test rolling files that they are open
+ but open_eelf_logs returns false
+ """
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ with patch('trapd_io.open_eelf_logs') as open_eelf_logs_throws:
+ open_eelf_logs_throws.return_value = False
+ with self.assertRaises(SystemExit):
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+
+ def test_roll_all_logs_open_file_json_traps_throws(self):
+ """
+ Test rolling files that they are open
+ but open_file(json_traps_filename) throws an exception
+ """
+
+ def tmp_func(nm):
+ if nm == tds.json_traps_filename:
+ raise RuntimeError("json_traps_filename throws")
+ return test_trapd_io.PseudoFile()
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ with patch('trapd_io.open_file') as open_file_throws:
+ open_file_throws.side_effect = tmp_func
+ with self.assertRaises(SystemExit):
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+
+ def test_roll_all_logs_open_file_arriving_traps_throws(self):
+ """
+ Test rolling files that they are open
+ but open_file(arriving_traps_filename) throws an exception
+ """
+
+ def tmp_func(nm):
+ if nm == tds.arriving_traps_filename:
+ raise RuntimeError("arriving_traps_filename throws")
+ return test_trapd_io.PseudoFile()
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ with patch('trapd_io.open_file') as open_file_throws:
+ open_file_throws.side_effect = tmp_func
+ with self.assertRaises(SystemExit):
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
def test_roll_file(self):
"""
Test roll of individual file when not present
"""
+ # try to roll a valid log file
+ with tempfile.TemporaryDirectory() as ntd:
+ fn = ntd + "/test.log"
+ with open(fn, "w") as ofp:
+ self.assertTrue(trapd_io.roll_file(fn))
+ # The file will be renamed to something like
+ # test.log.2022-08-17T20:28:32
+ self.assertFalse(os.path.exists(fn))
+ # We could also add a test to see if there is a file
+ # with a name like that.
+ files = list(glob.glob(f"{ntd}/*"))
+ print(f"files={files}")
+ self.assertEqual(len(files), 1)
+ self.assertTrue(files[0].startswith(fn + "."))
+
+
+ def test_roll_file_not_present(self):
+ """
+ Test roll of individual file when not present
+ """
+
# try to roll logs when not open
- result = trapd_io.roll_file("/file/not/present")
- self.assertEqual(result, False)
+ self.assertFalse(trapd_io.roll_file("/file/not/present"))
+
def test_roll_file_no_write_perms(self):
"""
try to roll logs when not enough perms
"""
- no_perms_dir = "/tmp/opt/app/snmptrap/no_perms"
- no_perms_file = "test.dat"
- no_perms_fp = no_perms_dir + "/" + no_perms_file
+ with tempfile.TemporaryDirectory() as no_perms_dir:
+ # no_perms_dir = "/tmp/opt/app/snmptrap/no_perms"
+ no_perms_file = "test.dat"
+ no_perms_fp = no_perms_dir + "/" + no_perms_file
- # required directory tree
- try:
- Path(no_perms_dir).mkdir(parents=True, exist_ok=True)
- os.chmod(no_perms_dir, 0o777)
- except Exception as e:
- print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
- sys.exit(1)
+ # required directory tree
+ #try:
+ # Path(no_perms_dir).mkdir(parents=True, exist_ok=True)
+ # os.chmod(no_perms_dir, 0o700)
+ #except Exception as e:
+ # self.fail("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+
+ # create empty file
+ open(no_perms_fp, "w").close()
+ os.chmod(no_perms_dir, 0o555)
- # create empty file
- open(no_perms_fp, "a").close()
- os.chmod(no_perms_dir, 0o444)
+ # try to roll file in dir with no write perms
+ self.assertFalse(trapd_io.roll_file(no_perms_fp))
- result = trapd_io.roll_file(no_perms_fp)
- self.assertEqual(result, False)
+ # the file should still be there
+ open(no_perms_fp).close()
+
+ # allow the directory to be removed
+ os.chmod(no_perms_dir, 0o700)
- # try to roll file in dir with no write perms
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_file(no_perms_fp)
- assert pytest_wrapped_exception.type == SystemExit
def test_open_file_exists(self):
"""
@@ -156,8 +431,9 @@ class test_trapd_io(unittest.TestCase):
# try to roll logs when not open
result = trapd_io.open_file(test_file)
- compare = str(result).startswith("<_io.TextIOWrapper name=")
- self.assertEqual(compare, True)
+ self.assertTrue(str(result).startswith("<_io.TextIOWrapper name="))
+ self.assertIsInstance(result, io.TextIOWrapper)
+
def test_open_file_exists_does_not_exist(self):
"""
@@ -168,9 +444,9 @@ class test_trapd_io(unittest.TestCase):
test_file = "/tmp/no_such_dir/snmptrap_pytest"
# try to open file when dir not present
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
+ with self.assertRaises(SystemExit):
result = trapd_io.open_file(test_file)
- assert pytest_wrapped_exception.type == SystemExit
+
def test_close_file_exists(self):
"""
@@ -182,18 +458,113 @@ class test_trapd_io(unittest.TestCase):
test_file = trapd_io.open_file(test_file_name)
# close active file
- result = trapd_io.close_file(test_file, test_file_name)
- self.assertEqual(result, True)
+ self.assertTrue(trapd_io.close_file(test_file, test_file_name))
+
- def test_close_file_does_not_exists(self):
+ def test_close_file_does_not_exist(self):
"""
Test closing non-existent file
"""
# try to roll logs when not open
- result = trapd_io.close_file(None, None)
- self.assertEqual(result, False)
+ self.assertFalse(trapd_io.close_file(None, None))
+
+
+ def test_ecomp_logger_type_error(self):
+ """
+ test trapd_io.ecomp_logger
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+
+
+ def test_ecomp_logger_type_error_bad_fd(self):
+ """
+ test trapd_io.ecomp_logger, but write() throws
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ # the following SHOULD be done with a context patch
+ sv_eelf_error_fd = tds.eelf_error_fd
+ tds.eelf_error_fd = test_trapd_io.WriteThrows()
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+ tds.eelf_error_fd = sv_eelf_error_fd
+
+
+ def test_ecomp_logger_type_unknown_bad_fd(self):
+ """
+ test trapd_io.ecomp_logger, unknown type, but write() throws
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ # the following SHOULD be done with a context patch
+ sv_eelf_error_fd = tds.eelf_error_fd
+ tds.eelf_error_fd = test_trapd_io.WriteThrows()
+ self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+ tds.eelf_error_fd = sv_eelf_error_fd
+
+
+ def test_ecomp_logger_type_metrics(self):
+ """
+ test trapd_io.ecomp_logger to metrics
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+
+
+ def test_ecomp_logger_type_metrics_bad_fd(self):
+ """
+ test trapd_io.ecomp_logger to metrics, but write() throws
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ # the following SHOULD be done with a context patch
+ sv_eelf_metrics_fd = tds.eelf_metrics_fd
+ tds.eelf_metrics_fd = test_trapd_io.WriteThrows()
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+ tds.eelf_metrics_fd = sv_eelf_metrics_fd
+
+
+ def test_ecomp_logger_type_audit(self):
+ """
+ test trapd_io.ecomp_logger to audit log
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+
+
+ def test_ecomp_logger_type_audit_bad_fd(self):
+ """
+ test trapd_io.ecomp_logger to audit log, but write() throws
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ # the following SHOULD be done with a context patch
+ sv_eelf_audit_fd = tds.eelf_audit_fd
+ tds.eelf_audit_fd = test_trapd_io.WriteThrows()
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+ tds.eelf_audit_fd = sv_eelf_audit_fd
+
+
+ def test_ecomp_logger_type_unknown(self):
+ """
+ test trapd_io.ecomp_logger
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_runtime_pid.py b/tests/test_trapd_runtime_pid.py
index c6b8601..923f730 100644
--- a/tests/test_trapd_runtime_pid.py
+++ b/tests/test_trapd_runtime_pid.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,24 +14,30 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
+import os
import unittest
-import trapd_runtime_pid
+from unittest.mock import patch
+
import trapd_io
+import trapd_runtime_pid
-class test_save_pid(unittest.TestCase):
+class test_trapd_runtime_pid(unittest.TestCase):
"""
Test the save_pid mod
"""
+ GOOD_PID_FILE = "/tmp/snmptrap_test_pid_file"
+ BAD_PID_FILE = "/tmp/snmptrap_test_pid_file_not_there"
+
def test_correct_usage(self):
"""
Test that attempt to create pid file in standard location works
"""
- result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file")
+ result = trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE)
self.assertEqual(result, True)
+
def test_missing_directory(self):
"""
Test that attempt to create pid file in missing dir fails
@@ -40,7 +46,6 @@ class test_save_pid(unittest.TestCase):
self.assertEqual(result, False)
-class test_rm_pid(unittest.TestCase):
"""
Test the rm_pid mod
"""
@@ -50,18 +55,26 @@ class test_rm_pid(unittest.TestCase):
Test that attempt to remove pid file in standard location works
"""
# must create it before removing it
- result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file")
- self.assertEqual(result, True)
- result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file")
- self.assertEqual(result, True)
+ self.assertTrue(trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE))
+ self.assertTrue(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.GOOD_PID_FILE))
+
def test_missing_file(self):
"""
Test that attempt to rm non-existent pid file fails
"""
- result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file_9999")
- self.assertEqual(result, False)
+ self.assertFalse(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.BAD_PID_FILE))
+
+
+ def test_correct_usage_but_throws(self):
+ """
+ Test that an exception while removing returns false
+ """
+ self.assertTrue(trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE))
+ with patch('os.remove') as mock_remove:
+ mock_remove.side_effect = RuntimeError("os.remove throws")
+ self.assertFalse(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.GOOD_PID_FILE))
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_settings.py b/tests/test_trapd_settings.py
index 92a3144..5625d78 100644
--- a/tests/test_trapd_settings.py
+++ b/tests/test_trapd_settings.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
@@ -24,66 +23,43 @@ pid_file_dne = "/tmp/test_pid_file_NOT"
import trapd_settings as tds
-class test_cleanup_and_exit(unittest.TestCase):
+class test_trapd_settings(unittest.TestCase):
"""
Test for presense of required vars
"""
+ @classmethod
+ def setUpClass(cls):
+ tds.init()
+
+
def test_nonexistent_dict(self):
"""
Test nosuch var
"""
- tds.init()
- try:
- tds.no_such_var
- result = True
- except:
- result = False
+ self.assertFalse(hasattr(tds, 'no_such_var'))
- self.assertEqual(result, False)
def test_config_dict(self):
"""
Test config dict
"""
- tds.init()
- try:
- tds.c_config
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(tds, 'c_config'))
- self.assertEqual(result, True)
def test_dns_cache_ip_to_name(self):
"""
Test dns cache name dict
"""
+ self.assertTrue(hasattr(tds, 'dns_cache_ip_to_name'))
- tds.init()
- try:
- tds.dns_cache_ip_to_name
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
def test_dns_cache_ip_expires(self):
"""
Test dns cache ip expires dict
"""
-
- tds.init()
- try:
- tds.dns_cache_ip_expires
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
+ self.assertTrue(hasattr(tds, 'dns_cache_ip_expires'))
-if __name__ == "__main__":
- # tds.init()
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_snmpv3.py b/tests/test_trapd_snmpv3.py
index eac6082..2011953 100644
--- a/tests/test_trapd_snmpv3.py
+++ b/tests/test_trapd_snmpv3.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,10 +14,10 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import json
import unittest
import os
+import traceback
from onap_dcae_cbs_docker_client.client import get_config
from trapd_exit import cleanup_and_exit
@@ -33,48 +33,459 @@ class test_snmpv3_config(unittest.TestCase):
Test snmpv3 module
"""
+ JSON_START = (
+ '{'
+ ' "snmptrapd": {'
+ ' "version": "2.0",'
+ ' "title": "ONAP SNMP Trap Receiver"'
+ ' },'
+ ' "protocols": {'
+ ' "transport": "udp",'
+ ' "ipv4_interface": "0.0.0.0",'
+ ' "ipv4_port": 6162,'
+ ' "ipv6_interface": "::1",'
+ ' "ipv6_port": 6162'
+ ' },'
+ ' "cache": {'
+ ' "dns_cache_ttl_seconds": 10800'
+ ' },'
+ ' "publisher": {'
+ ' "http_milliseconds_timeout": 500,'
+ ' "http_retries": 2,'
+ ' "http_milliseconds_between_retries": 250,'
+ ' "http_primary_publisher": "true",'
+ ' "http_peer_publisher": "unavailable",'
+ ' "max_traps_between_publishes": 10,'
+ ' "max_milliseconds_between_publishes": 10000'
+ ' },'
+ ' "streams_publishes": {'
+ ' "sec_fault_unsecure": {'
+ ' "type": "message_router",'
+ ' "aaf_password": null,'
+ ' "dmaap_info": {'
+ ' "location": "mtl5",'
+ ' "client_id": null,'
+ ' "client_role": null,'
+ ' "topic_url": "http://localhost:3904/events/unauthenticated.ONAP-COLLECTOR-SNMPTRAP"'
+ ' },'
+ ' "aaf_username": null'
+ ' }'
+ ' },'
+ ' "files": {'
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap",'
+ ' "log_dir": "logs",'
+ ' "data_dir": "data",'
+ ' "pid_dir": "tmp",'
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log",'
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log",'
+ ' "traps_stats_log": "snmptrapd_stats.csv",'
+ ' "perm_status_file": "snmptrapd_status.log",'
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs",'
+ ' "eelf_error": "error.log",'
+ ' "eelf_debug": "debug.log",'
+ ' "eelf_audit": "audit.log",'
+ ' "eelf_metrics": "metrics.log",'
+ ' "roll_frequency": "hour",'
+ ' "minimum_severity_to_log": 3'
+ ' },'
+ ' "check_hb_traps": {'
+ ' "trap_thr": 900,'
+ ' "hb_thr": 900,'
+ ' "hb_notify_oid": ".1.3.6.1.4.1.74.2.46.12.1.1"'
+ ' },'
+ ' "trap_config": {'
+ ' "sw_interval_in_seconds": 60,'
+ ' "metric_log_notification_threshold_pct": 25,'
+ ' "notify_oids": ['
+ ' {'
+ ' "oid": ".1.3.6.1.4.1.9.0.1",'
+ ' "sw_high_water_in_interval": 100,'
+ ' "sw_low_water_in_interval": 5,'
+ ' "category": "logonly"'
+ ' },'
+ ' {'
+ ' "oid": ".1.3.6.1.4.1.9.0.2",'
+ ' "sw_high_water_in_interval": 200,'
+ ' "sw_low_water_in_interval": 10,'
+ ' "category": "logonly"'
+ ' },'
+ ' {'
+ ' "oid": ".1.3.6.1.4.1.9.0.3",'
+ ' "sw_high_water_in_interval": 300,'
+ ' "sw_low_water_in_interval": 15,'
+ ' "category": "logonly"'
+ ' }'
+ ' ]'
+ ' }'
+ )
+ JSON_USERS = (
+ ' "snmpv3_config": {'
+ ' "usm_users": ['
+ ' {'
+ ' "user": "user1",'
+ ' "engineId": "8000000000000001",'
+ ' "usmHMACMD5AuthProtocol": "authkey1",'
+ ' "usmDESPrivProtocol": "privkey1"'
+ ' },'
+ ' {'
+ ' "user": "user2",'
+ ' "engineId": "8000000000000002",'
+ ' "usmHMACMD5AuthProtocol": "authkey2",'
+ ' "usm3DESEDEPrivProtocol": "privkey2"'
+ ' },'
+ ' {'
+ ' "user": "user3",'
+ ' "engineId": "8000000000000003",'
+ ' "usmHMACMD5AuthProtocol": "authkey3",'
+ ' "usmAesCfb128Protocol": "privkey3"'
+ ' },'
+ ' {'
+ ' "user": "user4",'
+ ' "engineId": "8000000000000004",'
+ ' "usmHMACMD5AuthProtocol": "authkey4",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey4"'
+ ' },'
+ ' {'
+ ' "user": "user5",'
+ ' "engineId": "8000000000000005",'
+ ' "usmHMACMD5AuthProtocol": "authkey5",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey5"'
+ ' },'
+ ' {'
+ ' "user": "user6",'
+ ' "engineId": "8000000000000006",'
+ ' "usmHMACMD5AuthProtocol": "authkey6",'
+ ' "usmAesCfb192Protocol": "privkey6"'
+ ' },'
+ ' {'
+ ' "user": "user7",'
+ ' "engineId": "8000000000000007",'
+ ' "usmHMACMD5AuthProtocol": "authkey7",'
+ ' "usmAesCfb256Protocol": "privkey7"'
+ ' },'
+ ' {'
+ ' "user": "user9",'
+ ' "engineId": "8000000000000009",'
+ ' "usmHMACSHAAuthProtocol": "authkey9",'
+ ' "usmDESPrivProtocol": "privkey9"'
+ ' },'
+ ' {'
+ ' "user": "user10",'
+ ' "engineId": "8000000000000010",'
+ ' "usmHMACSHAAuthProtocol": "authkey10",'
+ ' "usm3DESEDEPrivProtocol": "privkey10"'
+ ' },'
+ ' {'
+ ' "user": "user11",'
+ ' "engineId": "8000000000000011",'
+ ' "usmHMACSHAAuthProtocol": "authkey11",'
+ ' "usmAesCfb128Protocol": "privkey11"'
+ ' },'
+ ' {'
+ ' "user": "user12",'
+ ' "engineId": "8000000000000012",'
+ ' "usmHMACSHAAuthProtocol": "authkey12",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey12"'
+ ' },'
+ ' {'
+ ' "user": "user13",'
+ ' "engineId": "8000000000000013",'
+ ' "usmHMACSHAAuthProtocol": "authkey13",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey13"'
+ ' },'
+ ' {'
+ ' "user": "user14",'
+ ' "engineId": "8000000000000014",'
+ ' "usmHMACSHAAuthProtocol": "authkey14",'
+ ' "usmAesCfb192Protocol": "privkey14"'
+ ' },'
+ ' {'
+ ' "user": "user15",'
+ ' "engineId": "8000000000000015",'
+ ' "usmHMACSHAAuthProtocol": "authkey15",'
+ ' "usmAesCfb256Protocol": "privkey15"'
+ ' },'
+ ' {'
+ ' "user": "user17",'
+ ' "engineId": "8000000000000017",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey17",'
+ ' "usmDESPrivProtocol": "privkey17"'
+ ' },'
+ ' {'
+ ' "user": "user18",'
+ ' "engineId": "8000000000000018",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey18",'
+ ' "usm3DESEDEPrivProtocol": "privkey18"'
+ ' },'
+ ' {'
+ ' "user": "user19",'
+ ' "engineId": "8000000000000019",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey19",'
+ ' "usmAesCfb128Protocol": "privkey19"'
+ ' },'
+ ' {'
+ ' "user": "user20",'
+ ' "engineId": "8000000000000020",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey20",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey20"'
+ ' },'
+ ' {'
+ ' "user": "user21",'
+ ' "engineId": "8000000000000021",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey21",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey21"'
+ ' },'
+ ' {'
+ ' "user": "user22",'
+ ' "engineId": "8000000000000022",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey22",'
+ ' "usmAesCfb192Protocol": "privkey22"'
+ ' },'
+ ' {'
+ ' "user": "user23",'
+ ' "engineId": "8000000000000023",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey23",'
+ ' "usmAesCfb256Protocol": "privkey23"'
+ ' },'
+ ' {'
+ ' "user": "user25",'
+ ' "engineId": "8000000000000025",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey25",'
+ ' "usmDESPrivProtocol": "privkey25"'
+ ' },'
+ ' {'
+ ' "user": "user26",'
+ ' "engineId": "8000000000000026",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey26",'
+ ' "usm3DESEDEPrivProtocol": "privkey26"'
+ ' },'
+ ' {'
+ ' "user": "user27",'
+ ' "engineId": "8000000000000027",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey27",'
+ ' "usmAesCfb128Protocol": "privkey27"'
+ ' },'
+ ' {'
+ ' "user": "user28",'
+ ' "engineId": "8000000000000028",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey28",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey28"'
+ ' },'
+ ' {'
+ ' "user": "user29",'
+ ' "engineId": "8000000000000029",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey29",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey29"'
+ ' },'
+ ' {'
+ ' "user": "user30",'
+ ' "engineId": "8000000000000030",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey30",'
+ ' "usmAesCfb192Protocol": "privkey30"'
+ ' },'
+ ' {'
+ ' "user": "user31",'
+ ' "engineId": "8000000000000031",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey31",'
+ ' "usmAesCfb256Protocol": "privkey31"'
+ ' },'
+ ' {'
+ ' "user": "user33",'
+ ' "engineId": "8000000000000033",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey33",'
+ ' "usmDESPrivProtocol": "privkey33"'
+ ' },'
+ ' {'
+ ' "user": "user34",'
+ ' "engineId": "8000000000000034",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey34",'
+ ' "usm3DESEDEPrivProtocol": "privkey34"'
+ ' },'
+ ' {'
+ ' "user": "user35",'
+ ' "engineId": "8000000000000035",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey35",'
+ ' "usmAesCfb128Protocol": "privkey35"'
+ ' },'
+ ' {'
+ ' "user": "user36",'
+ ' "engineId": "8000000000000036",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey36",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey36"'
+ ' },'
+ ' {'
+ ' "user": "user37",'
+ ' "engineId": "8000000000000037",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey37",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey37"'
+ ' },'
+ ' {'
+ ' "user": "user38",'
+ ' "engineId": "8000000000000038",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey38",'
+ ' "usmAesCfb192Protocol": "privkey38"'
+ ' },'
+ ' {'
+ ' "user": "user39",'
+ ' "engineId": "8000000000000039",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey39",'
+ ' "usmAesCfb256Protocol": "privkey39"'
+ ' },'
+ ' {'
+ ' "user": "user41",'
+ ' "engineId": "8000000000000041",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey41",'
+ ' "usmDESPrivProtocol": "privkey41"'
+ ' },'
+ ' {'
+ ' "user": "user42",'
+ ' "engineId": "8000000000000042",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey42",'
+ ' "usm3DESEDEPrivProtocol": "privkey42"'
+ ' },'
+ ' {'
+ ' "user": "user43",'
+ ' "engineId": "8000000000000043",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey43",'
+ ' "usmAesCfb128Protocol": "privkey43"'
+ ' },'
+ ' {'
+ ' "user": "user44",'
+ ' "engineId": "8000000000000044",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey44",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey44"'
+ ' },'
+ ' {'
+ ' "user": "user45",'
+ ' "engineId": "8000000000000045",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey45",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey45"'
+ ' },'
+ ' {'
+ ' "user": "user46",'
+ ' "engineId": "8000000000000046",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey46",'
+ ' "usmAesCfb192Protocol": "privkey46"'
+ ' },'
+ ' {'
+ ' "user": "user47",'
+ ' "engineId": "8000000000000047",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey47",'
+ ' "usmAesCfb256Protocol": "privkey47"'
+ ' },'
+ ' {'
+ ' "user": "user48",'
+ ' "engineId": "8000000000000048",'
+ ' "usmNoAuthProtocol": "authkey48",'
+ ' "usmNoPrivProtocol": "privkey48"'
+ ' },'
+ ' {'
+ ' "user": "user49",'
+ ' "engineId": "8000000000000049",'
+ ' "unknownAuthProtocol": "authkey49",'
+ ' "unknownProtocol": "privkey49"'
+ ' }'
+ ' ]'
+ ' }'
+ )
+ JSON_MISSING_USER = (
+ ' "snmpv3_config": {'
+ ' "usm_users": ['
+ ' {'
+ ' "baduser": "user50",'
+ ' "engineId": "8000000000000050",'
+ ' "unknownAuthProtocol": "authkey50",'
+ ' "unknownProtocol": "privkey50"'
+ ' }'
+ ' ]'
+ ' }'
+ )
+ JSON_MISSING_ENGINE = (
+ ' "snmpv3_config": {'
+ ' "usm_users": ['
+ ' {'
+ ' "user": "user51",'
+ ' "badengineId": "8000000000000051",'
+ ' "unknownAuthProtocol": "authkey51",'
+ ' "unknownProtocol": "privkey51"'
+ ' }'
+ ' ]'
+ ' }'
+ )
+ JSON_COMMA = ','
+ JSON_END = '}'
+
+ @classmethod
+ def setUpClass(cls):
+ tds.init()
+
+
def test_v3_config_present(self):
"""
Test that snmpv3 config is present
"""
- tds.c_config = json.loads(
- '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" }, { "user": "user4"} , { "engineId": "1"}, { "user": "user6", "engineId": "8000000000000006", "usmNoAuth": "authkey3", "usmAesCfb192": "privkey6" }, { "user": "user7", "engineId": "8000000000000007", "usmNoAuth": "", "usmNoPriv": "" }] } }'
+ pconfig = (
+ test_snmpv3_config.JSON_START +
+ test_snmpv3_config.JSON_COMMA +
+ test_snmpv3_config.JSON_USERS +
+ test_snmpv3_config.JSON_END
)
+ tds.c_config = json.loads(pconfig)
- # del os.environ['CBS_SIM_JSON']
- # result = trapd_get_cbs_config.get_cbs_config()
- # print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, False)
+ snmp_engine = engine.SnmpEngine()
+ rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
+ self.assertEqual(rsnmp_engine, snmp_engine)
- with pytest.raises(Exception) as pytest_wrapped_sys_exit:
- snmp_engine = engine.SnmpEngine()
- result = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
- # config, snmp_engine=trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, pytest_json_data_with_v3)
- assert pytest_wrapped_sys_exit.type == SystemExit
- # assert pytest_wrapped_sys_exit.value.code == 1
def test_v3_config_not_present(self):
"""
Test that app is ok if v3 config not present
"""
- tds.c_config = json.loads(
- '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } } }'
- )
+ pconfig = (
+ test_snmpv3_config.JSON_START +
+ test_snmpv3_config.JSON_END
+ )
+ tds.c_config = json.loads(pconfig)
- # del os.environ['CBS_SIM_JSON']
- # result = trapd_get_cbs_config.get_cbs_config()
- # print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, False)
+ snmp_engine = engine.SnmpEngine()
+ rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
+ self.assertEqual(rsnmp_engine, snmp_engine)
+
+
+ @unittest.skip("need to understand what happens when a username is missing")
+ def test_v3_config_missing_user(self):
+ """
+ Test that app is ok if v3 config has a missing user name
+ """
+ pconfig = (
+ test_snmpv3_config.JSON_START +
+ test_snmpv3_config.JSON_COMMA +
+ test_snmpv3_config.JSON_MISSING_USER +
+ test_snmpv3_config.JSON_END
+ )
+ tds.c_config = json.loads(pconfig)
+
+ snmp_engine = engine.SnmpEngine()
+ rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
+ self.assertEqual(rsnmp_engine, snmp_engine)
+
+
+ def test_v3_config_missing_engine(self):
+ """
+ Test that app is ok if v3 config has a missing engine name
+ """
+ pconfig = (
+ test_snmpv3_config.JSON_START +
+ test_snmpv3_config.JSON_COMMA +
+ test_snmpv3_config.JSON_MISSING_ENGINE +
+ test_snmpv3_config.JSON_END
+ )
+ tds.c_config = json.loads(pconfig)
- with pytest.raises(Exception) as pytest_wrapped_sys_exit:
- snmp_engine = engine.SnmpEngine()
- result = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
- # config, snmp_engine=trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, pytest_json_data_with_v3)
- assert pytest_wrapped_sys_exit.type == SystemExit
- # assert pytest_wrapped_sys_exit.value.code == 1
+ snmp_engine = engine.SnmpEngine()
+ rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
+ self.assertEqual(rsnmp_engine, snmp_engine)
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py
index 16b0a17..3041884 100644
--- a/tests/test_trapd_stormwatch.py
+++ b/tests/test_trapd_stormwatch.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,14 +14,15 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
import time
+from unittest.mock import patch
import trapd_stormwatch as sw
import trapd_stormwatch_settings as sws
import trapd_stats_settings as stats
+import trapd_settings as tds
class test_cleanup_and_exit(unittest.TestCase):
@@ -29,84 +30,182 @@ class test_cleanup_and_exit(unittest.TestCase):
Test for presense of required vars
"""
- def test_increment_existing_counter(self):
- """
- Test increment counter
- """
- sw.sw_init()
+ @classmethod
+ def setUp(cls):
+ tds.init()
stats.init()
+ sw.sw_init()
+ sws.init()
- oid = ".1.2.3.4.5.6"
- sws.sw_config_oid_dict[oid] = True
- sws.sw_config_low_water_in_interval_dict[oid] = 1
- sws.sw_config_high_water_in_interval_dict[oid] = 10
-
- try:
- sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6")
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
-
- # try again, but with stats.total_notifications removed
- delattr(stats, "total_notifications")
- try:
- sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6")
- result = True
- except:
- result = False
+ def test_sw_init(self):
+ """ test sw_init() """
+ sw.sw_init()
+ self.assertEqual(sws.sw_interval_in_seconds, 60)
+ sws.init()
+ self.assertEqual(sws.sw_config_category, {})
- self.assertEqual(result, True)
def test_sw_clear_dicts(self):
"""
Test sw_clear_dicts
"""
- sw.sw_init()
- # initialize attributes not handled by sw_init()
- sws.sw_storm_counter_dict = {}
- stats.agent_counter_dict = {}
- stats.oid_counter_dict = {}
- sws.sw_config_category = {}
- # provide a value that can tested for
- sws.sw_storm_counter_dict["abc"] = "def"
+ with patch.dict('trapd_stormwatch_settings.sw_storm_counter_dict'):
+ with patch.dict('trapd_stormwatch_settings.sw_config_category'):
+ with patch.dict('trapd_stats_settings.agent_counter_dict'):
+ with patch.dict('trapd_stats_settings.oid_counter_dict'):
+
+ # initialize attributes not handled by sw_init()
+ sws.sw_storm_counter_dict = {}
+ sws.sw_config_category = {}
+ stats.agent_counter_dict = {}
+ stats.oid_counter_dict = {}
+
+ # provide a value that can tested for
+ sws.sw_storm_counter_dict["abc"] = "def"
+ self.assertTrue("abc" in sws.sw_storm_counter_dict)
+
+ self.assertTrue(sw.sw_clear_dicts())
+ self.assertFalse("abc" in sws.sw_storm_counter_dict)
- sw.sw_clear_dicts()
- self.assertFalse("abc" in sws.sw_storm_counter_dict)
+ # now make sure we get an exception
+ sws.sw_config_category = 3
+ self.assertFalse(sw.sw_clear_dicts())
- # now make sure we get an exception
- sws.sw_config_category = 3
- self.assertFalse(sw.sw_clear_dicts())
- # clean up the attributes we added above
- delattr(sws, "sw_storm_counter_dict")
- delattr(stats, "agent_counter_dict")
- delattr(stats, "oid_counter_dict")
- delattr(sws, "sw_config_category")
+ def test_sw_load_trap_config(self):
+ """ Test sw_load_trap_config(_config) """
+ trap_dict_info = {
+ "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
+ "agent address": "1.2.3.4",
+ "agent name": "test-agent.nodomain.com",
+ "cambria.partition": "test-agent.nodomain.com",
+ "community": "",
+ "community len": 0,
+ "epoch_serno": 15222068260000,
+ "protocol version": "v2c",
+ "time received": 1522206826.2938566,
+ "trap category": "ONAP-COLLECTOR-SNMPTRAP",
+ "sysUptime": "218567736",
+ "notify OID": "1.3.6.1.4.1.9999.9.9.999",
+ "trap_config": { },
+ "notify OID len": 10,
+ }
+
+ # normal operation, and variations
+ ret1 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret1, 0)
+ self.assertIsInstance(ret1, int)
+
+ trap_dict_info["trap_config"]["notify_oids"] = [
+ { "oidx": "1.3.6.1.4.1.9999.9.9.888" }
+ ]
+ ret2 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret2, 0)
+ self.assertIsInstance(ret2, int)
+
+ trap_dict_info["trap_config"]["metric_log_notification_threshold_pct"] = 33
+ ret3 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret3, 0)
+ self.assertIsInstance(ret3, int)
+
+ trap_dict_info["trap_config"]["sw_interval_in_seconds"] = 50
+ ret4 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret4, 0)
+ self.assertIsInstance(ret4, int)
+
+ trap_dict_info["trap_config"]["notify_oids"] = [
+ {
+ "oid": "1.3.6.1.4.1.9999.9.9.888",
+ "sw_high_water_in_interval": 3,
+ "sw_low_water_in_interval": 2,
+ "category": "abc",
+ }
+ ]
+ ret5 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret5, 1)
+ self.assertIsInstance(ret5, int)
+
+ delattr(sws, 'sw_storm_active_dict')
+ ret6 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret6, 1)
+ self.assertIsInstance(ret6, int)
+
def test_sw_log_metrics(self):
"""
Test sw_clear_log_metrics
"""
- sw.sw_init()
-
stats.total_notifications = 3
stats.total_notifications = 50
sws.sw_interval_in_seconds = 30
stats.agent_counter_dict = {"a": 3, "b": 40}
stats.metric_log_notification_threshold_pct = 30
- sw.sw_log_metrics()
- # make sure we got this far
- assert True
+ with patch('trapd_stormwatch.ecomp_logger') as magic_ecomp_logger:
+ sw.sw_log_metrics()
+ self.assertEqual(magic_ecomp_logger.call_count, 3)
+
+
+ def test_increment_existing_counter(self):
+ """
+ Test increment counter. There should NOT be an exception.
+ """
+ oid = ".1.2.3.4.5.6"
+ sws.sw_config_oid_dict[oid] = True
+ sws.sw_config_low_water_in_interval_dict[oid] = 1
+ sws.sw_config_high_water_in_interval_dict[oid] = 10
+
+ loc_agent = "192.168.1.1"
+ stats.agent_counter_dict[loc_agent] = 2
+ stats.oid_counter_dict[oid] = 102
+
+ sv_total_notifications = stats.total_notifications
+ sv_agent_count_dict = stats.agent_counter_dict[loc_agent]
+ sv_oid_count_dict = stats.oid_counter_dict[oid]
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except Exception as e:
+ result = False
+ self.assertEqual(result, True, "test_increment_existing_counter")
+ self.assertEqual(stats.total_notifications, sv_total_notifications+1)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], sv_agent_count_dict+1)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+1)
+
+ # try again, without agent_counter_dict[loc_agent]
+ del stats.agent_counter_dict[loc_agent]
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except Exception as e:
+ result = False
+ self.assertEqual(result, True, "test_increment_existing_counter")
+ self.assertEqual(stats.total_notifications, sv_total_notifications+2)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], 1)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+2)
+
+ # try again, but with stats.total_notifications removed
+ delattr(stats, "total_notifications")
+
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except:
+ result = False
+
+ self.assertEqual(result, True, "stats.total_notifications removed")
+ self.assertEqual(stats.total_notifications, 1)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], 2)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+3)
+
def test_sw_storm_active(self):
"""
Test sw_storm_active()
"""
- sw.sw_init()
+ print(f"sw_last_stormwatch_dict_analysis={sws.sw_last_stormwatch_dict_analysis}")
+
# initialize attributes not handled by sw_init()
stats.oid_counter_dict = {}
@@ -154,7 +253,9 @@ class test_cleanup_and_exit(unittest.TestCase):
self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid))
# now force sws.sw_last_stormwatch_dict_analysis to an old value
- sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20
+ sws.sw_last_stormwatch_dict_analysis = (
+ int(time.time()) - sws.sw_interval_in_seconds - 20
+ )
# and make certain that stats.oid_counter_dict got cleared.
if not hasattr(stats, "oid_counter_dict"):
stats.oid_counter_dict = {}
@@ -164,6 +265,47 @@ class test_cleanup_and_exit(unittest.TestCase):
# .get("abc") != None)
-if __name__ == "__main__":
+ @patch('trapd_stormwatch.ecomp_logger')
+ def test_sw_reset_counter_dict(self, magic_ecomp_logger):
+ """ test sw_reset_counter_dict() """
+
+ loc_agent = "192.168.1.1"
+ loc_oid = ".1.2.3.4.5.6"
+ loc_agent_oid = loc_agent + " " + loc_oid
+
+ self.assertTrue(sw.sw_reset_counter_dict())
+ self.assertEqual(magic_ecomp_logger.call_count, 2)
+
+ # cases:
+ # for lao in storm_active_dict:
+ # storm_counter_dict[lao] >= high_water_val[lo]
+ # storm_counter_dict[lao] < high_water_val[lo]
+ # storm_counter_dict[lao] < low_water_val[lo]
+ # storm_counter_dict[lao] >= low_water_val[lo]
+
+ with patch.dict(sws.sw_storm_counter_dict, {
+ loc_agent_oid: 20
+ }):
+ # values around the 20 above
+ for high_water_mark in [2, 60]:
+ # values around the 20 above
+ for low_water_mark in [0, 30]:
+ with patch.dict(sws.sw_config_high_water_in_interval_dict, {
+ loc_oid: high_water_mark
+ }):
+ with patch.dict(sws.sw_config_low_water_in_interval_dict, {
+ loc_oid: low_water_mark
+ }):
+ with patch.dict(sws.sw_storm_active_dict, {
+ loc_agent_oid: "anything"
+ }):
+ sv_storm_counter = sws.sw_storm_counter_dict[loc_agent_oid]
+ magic_ecomp_logger.call_count = 0
+ self.assertTrue(sw.sw_reset_counter_dict())
+ self.assertEqual(magic_ecomp_logger.call_count, 3)
+ self.assertEqual(sws.sw_storm_counter_dict[loc_agent_oid], 0)
+
+
+if __name__ == "__main__": # pragma: no cover
# sws.init()
unittest.main()
diff --git a/tests/test_trapd_stormwatch_settings.py b/tests/test_trapd_stormwatch_settings.py
index bcb04a7..f5ad9a7 100644
--- a/tests/test_trapd_stormwatch_settings.py
+++ b/tests/test_trapd_stormwatch_settings.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
@@ -34,26 +33,16 @@ class test_cleanup_and_exit(unittest.TestCase):
Test nosuch var
"""
sws.init()
- try:
- sws.no_such_var
- result = True
- except:
- result = False
+ self.assertFalse(hasattr(sws, 'no_such_var'))
- self.assertEqual(result, False)
def test_storm_counter_dict(self):
"""
Test storm_counter_dict
"""
sws.init()
- try:
- sws.sw_storm_counter_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_storm_counter_dict'))
- self.assertEqual(result, True)
def test_storm_active_dict(self):
"""
@@ -61,13 +50,8 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_storm_active_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_storm_active_dict'))
- self.assertEqual(result, True)
def test_sw_config_oid_dict(self):
"""
@@ -75,13 +59,8 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_config_oid_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_config_oid_dict'))
- self.assertEqual(result, True)
def test_sw_config_low_water_in_interval_dict(self):
"""
@@ -89,13 +68,8 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_config_low_water_in_interval_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_config_low_water_in_interval_dict'))
- self.assertEqual(result, True)
def test_sw_config_high_water_in_interval_dict(self):
"""
@@ -103,13 +77,8 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_config_high_water_in_interval_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_config_high_water_in_interval_dict'))
- self.assertEqual(result, True)
def test_sw_config_category(self):
"""
@@ -117,43 +86,26 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_config_category
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_config_category'))
- self.assertEqual(result, True)
def test_sw_interval_in_seconds(self):
"""
Test sw_interval
"""
-
sws.init()
- try:
- str(sws.sw_interval_in_seconds).isnumeric()
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_interval_in_seconds'))
+ self.assertTrue(str(sws.sw_interval_in_seconds).isnumeric())
- self.assertEqual(result, True)
def test_sw_last_stormwatch_dict_analysis(self):
"""
Test last_stormwatch_dict_analysis
"""
-
sws.init()
- try:
- str(sws.sw_last_stormwatch_dict_analysis).isnumeric()
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
+ self.assertTrue(hasattr(sws, 'sw_last_stormwatch_dict_analysis'))
+ self.assertTrue(str(sws.sw_last_stormwatch_dict_analysis).isnumeric())
-if __name__ == "__main__":
- # sws.init()
- unittest.main()
+if __name__ == "__main__": # pragma: no cover
+ unittest.main(verbosity=2)
diff --git a/tests/test_trapd_vb_types.py b/tests/test_trapd_vb_types.py
index 5792b8c..28150c9 100644
--- a/tests/test_trapd_vb_types.py
+++ b/tests/test_trapd_vb_types.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,18 +14,10 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
-import json
import unittest
-import os
-from onap_dcae_cbs_docker_client.client import get_config
-from trapd_exit import cleanup_and_exit
-from trapd_io import stdout_logger, ecomp_logger
-import trapd_settings as tds
import trapd_vb_types
-
-from pysnmp.entity import engine, config
+import trapd_settings as tds
class test_trapd_vb_types(unittest.TestCase):
@@ -33,121 +25,102 @@ class test_trapd_vb_types(unittest.TestCase):
Test snmpv3 module
"""
- good_varbind_types = [
- "Integer",
- "Unsigned32",
- "Counter32",
- "OctetString",
- "ObjectIdentifier",
- "TimeTicks",
- "IpAddress",
- ]
+ @classmethod
+ def setUpClass(cls):
+ tds.init()
- def trapd_vb_type_conversion_integer(self):
+
+ def test_trapd_vb_type_conversion_integer32(self):
"""
Test that pysnmp varbind types Integer converts
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer32"), "integer")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer32")
- self.assertEqual(result, "integer")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_integer(self):
"""
Test that pysnmp varbind types Integer converts
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer"), "integer")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer")
- self.assertEqual(result, "integer")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_gauge32(self):
"""
Test that pysnmp varbind types Integer converts
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Gauge32"), "unsigned")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Gauge32")
- self.assertEqual(result, "unsigned")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_counter32(self):
"""
Test that pysnmp varbind types Integer converts
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Counter32"), "counter32")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Counter32")
- self.assertEqual(result, "counter32")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_octetstring(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("OctetString"), "octet")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("OctetString")
- self.assertEqual(result, "octet")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_py_type_5(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_5"), "hex")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_5")
- self.assertEqual(result, "hex")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_py_type_6(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_6"), "decimal")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_6")
- self.assertEqual(result, "decimal")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_null(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Null"), "null")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Null")
- self.assertEqual(result, "null")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_objectidentifier(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("ObjectIdentifier"), "oid")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("ObjectIdentifier")
- self.assertEqual(result, "oid")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_timeticks(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("TimeTicks"), "timeticks")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("TimeTicks")
- self.assertEqual(result, "timeticks")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_ipaddress(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("IpAddress"), "ipaddress")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("IpAddress")
- self.assertEqual(result, "ipaddress")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_bits(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Bits"), "bits")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Bits")
- self.assertEqual(result, "bits")
- def trapd_vb_type_conversion_invalid(self):
+ def test_trapd_vb_type_conversion_invalid(self):
"""
Test that pysnmp varbind types convert accurately
"""
-
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("noSuchVarbindType")
# should return default of octet if not defined
- self.assertEqual(result, "octet")
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("noSuchVarbindType"), "octet")
-if __name__ == "__main__":
- unittest.main()
+if __name__ == "__main__": # pragma: no cover
+ unittest.main(verbosity=2)
diff --git a/tox.ini b/tox.ini
index a91a3e3..b426e30 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,23 +1,24 @@
# content of: tox.ini , put in same dir as setup.py
[tox]
# envlist = py36
-envlist = py36,py37,py38,py39
-skip_missing_interpreters = true
+# envlist = py36,py37,py38,py39
+envlist = py39
+# skip_missing_interpreters = true
+# isolated_build = True
[testenv]
deps=
- -rrequirements.txt
+ -rrequirements.txt
pytest
coverage
pytest-cov
+
setenv =
PYTHONPATH={toxinidir}/snmptrap:{toxinidir}/snmptrap/mod:{toxinidir}/tests
CBS_SIM_JSON={toxinidir}/etc/snmptrapd.json
-recreate = True
+
+# recreate = True
+
commands=
- mkdir -p /tmp/opt/app/snmptrap/logs/
- mkdir -p /tmp/opt/app/snmptrap/tmp/
- mkdir -p /tmp/opt/app/snmptrap/etc/
- mkdir -p /tmp/opt/app/snmptrap/data/
- pytest --cov snmptrap --cov-report=xml --cov-report=term tests --verbose
+ pytest --cov snmptrap --cov-report=xml --cov-report=html --cov-report=term tests --verbose --verbose --verbose
whitelist_externals = mkdir
diff --git a/version.properties b/version.properties
index 7b12999..a640c99 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
major=2
minor=0
-patch=6
+patch=7
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT