aboutsummaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorHansen, Tony (th1395) <th1395@att.com>2022-08-17 21:36:24 +0000
committerHansen, Tony (th1395) <th1395@att.com>2022-08-22 20:40:07 +0000
commit4b6b34642374103b8ccf938eb9e970232bfb63ae (patch)
tree526c304ddc9207831cc27b6b26d01e46be5c763c /tests
parent5116f87391d00c04b789a59d1abcebbbf80147ad (diff)
CodeCoverage improvement (60% to 90%)
Change-Id: I579e9d3fedbd6cc2589d189b121fa7dadfb6f7f1 Signed-off-by: Hansen, Tony (th1395) <th1395@att.com> Issue-ID: DCAEGEN2-3158 Signed-off-by: Hansen, Tony (th1395) <th1395@att.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/test_snmptrapd.py610
-rw-r--r--tests/test_trapd_exit.py26
-rw-r--r--tests/test_trapd_get_cbs_config.py221
-rw-r--r--tests/test_trapd_http_session.py63
-rw-r--r--tests/test_trapd_io.py521
-rw-r--r--tests/test_trapd_runtime_pid.py39
-rw-r--r--tests/test_trapd_settings.py48
-rw-r--r--tests/test_trapd_snmpv3.py471
-rw-r--r--tests/test_trapd_stormwatch.py250
-rw-r--r--tests/test_trapd_stormwatch_settings.py76
-rw-r--r--tests/test_trapd_vb_types.py95
11 files changed, 1889 insertions, 531 deletions
diff --git a/tests/test_snmptrapd.py b/tests/test_snmptrapd.py
index dee2aa0..736e114 100644
--- a/tests/test_snmptrapd.py
+++ b/tests/test_snmptrapd.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,11 +14,17 @@
# limitations under the License.
# ============LICENSE_END=========================================================
+import copy
+import datetime
import os
-import pytest
import unittest
+from pathlib import Path
+import time
+from unittest.mock import patch, Mock
+
+import requests
+
import snmptrapd
-import datetime
import trapd_settings as tds
import trapd_stormwatch_settings as sws
@@ -37,40 +43,207 @@ class test_snmptrapd(unittest.TestCase):
Test the save_pid mod
"""
- pytest_json_data = '{ "snmptrapd": { "version": "2.0.3", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }'
+ class WriteThrows():
+ def write():
+ raise RuntimeError("write() throws")
+
+
+ @classmethod
+ def setUpClass(cls):
+
+ # init vars
+ tds.init()
+ sw.sw_init()
+
+ # fmt: off
+ test_snmptrapd.pytest_empty_data = "{}"
+ test_snmptrapd.pytest_json_data = (
+ '{ "snmptrapd": { '
+ ' "version": "2.0.3", '
+ ' "title": "ONAP SNMP Trap Receiver" }, '
+ ' "protocols": { '
+ ' "transport": "udp", '
+ ' "ipv4_interface": "0.0.0.0", '
+ ' "ipv4_port": 6162, '
+ ' "ipv6_interface": "::1", '
+ ' "ipv6_port": 6162 }, '
+ ' "cache": { '
+ ' "dns_cache_ttl_seconds": 60 }, '
+ ' "publisher": { '
+ ' "http_timeout_milliseconds": 1500, '
+ ' "http_retries": 3, '
+ ' "http_milliseconds_between_retries": 750, '
+ ' "http_primary_publisher": "true", '
+ ' "http_peer_publisher": "unavailable", '
+ ' "max_traps_between_publishes": 10, '
+ ' "max_milliseconds_between_publishes": 10000 }, '
+ ' "streams_publishes": { '
+ ' "sec_fault_unsecure": { '
+ ' "type": "message_router", '
+ ' "aaf_password": null, '
+ ' "dmaap_info": { '
+ ' "location": "mtl5", '
+ ' "client_id": null, '
+ ' "client_role": null, '
+ ' "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, '
+ ' "aaf_username": null } }, '
+ ' "files": { '
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap", '
+ ' "log_dir": "logs", '
+ ' "data_dir": "data", '
+ ' "pid_dir": "tmp", '
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log", '
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log", '
+ ' "traps_stats_log": "snmptrapd_stats.csv", '
+ ' "perm_status_file": "snmptrapd_status.log", '
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", '
+ ' "eelf_error": "error.log", '
+ ' "eelf_debug": "debug.log", '
+ ' "eelf_audit": "audit.log", '
+ ' "eelf_metrics": "metrics.log", '
+ ' "roll_frequency": "day", '
+ ' "minimum_severity_to_log": 2 }, '
+ ' "trap_config": { '
+ ' "sw_interval_in_seconds": 60, '
+ ' "notify_oids": { '
+ ' ".1.3.6.1.4.1.9.0.1": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.2": { '
+ ' "sw_high_water_in_interval": 101, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.3": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.4": { '
+ ' "sw_high_water_in_interval": 10, '
+ ' "sw_low_water_in_interval": 3, '
+ ' "category": "logonly" } } }, '
+ ' "snmpv3_config": { '
+ ' "usm_users": [ { '
+ ' "user": "usr-sha-aes256", '
+ ' "engineId": "8000000001020304", '
+ ' "usmHMACSHAAuth": "authkey1", '
+ ' "usmAesCfb256": "privkey1" }, '
+ ' { "user": "user1", '
+ ' "engineId": "8000000000000001", '
+ ' "usmHMACMD5Auth": "authkey1", '
+ ' "usmDESPriv": "privkey1" }, '
+ ' { "user": "user2", '
+ ' "engineId": "8000000000000002", '
+ ' "usmHMACSHAAuth": "authkey2", '
+ ' "usmAesCfb128": "privkey2" }, '
+ ' { "user": "user3", '
+ ' "engineId": "8000000000000003", '
+ ' "usmHMACSHAAuth": "authkey3", '
+ ' "usmAesCfb256": "privkey3" } '
+ ' ] } }'
+ )
+ # fmt: off
+
+ test_snmptrapd.trap_dict_info = {
+ "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
+ "agent address": "1.2.3.4",
+ "agent name": "test-agent.nodomain.com",
+ "cambria.partition": "test-agent.nodomain.com",
+ "community": "",
+ "community len": 0,
+ "epoch_serno": 15222068260000,
+ "protocol version": "v2c",
+ "time received": 1522206826.2938566,
+ "trap category": "ONAP-COLLECTOR-SNMPTRAP",
+ "sysUptime": "218567736",
+ "notify OID": "1.3.6.1.4.1.9999.9.9.999",
+ "notify OID len": 10,
+ }
+
+ snmptrap_dir = "/tmp/opt/app/snmptrap"
+ try:
+ Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True)
+ except Exception as e:
+ print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+ sys.exit(1)
+
+ # create copy of snmptrapd.json for pytest
+ test_snmptrapd.pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
+ with open(test_snmptrapd.pytest_json_config, "w") as outfile:
+ outfile.write(test_snmptrapd.pytest_json_data)
+
+ test_snmptrapd.pytest_empty_config = "/tmp/opt/app/snmptrap/etc/empty.json"
+ with open(test_snmptrapd.pytest_empty_config, "w") as outfile:
+ outfile.write(test_snmptrapd.pytest_empty_data)
- # create copy of snmptrapd.json for pytest
- pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
- with open(pytest_json_config, "w") as outfile:
- outfile.write(pytest_json_data)
def test_usage_err(self):
"""
Test usage error
"""
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
- result = snmptrapd.usage_err()
- assert pytest_wrapped_sys_exit.type == SystemExit
- assert pytest_wrapped_sys_exit.value.code == 1
+ with self.assertRaises(SystemExit) as exc:
+ snmptrapd.usage_err()
+ self.assertEqual(str(exc.exception), "1")
+
def test_load_all_configs(self):
"""
Test load of all configs
"""
+ # request load of CBS data
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
- # init vars
- tds.init()
- sw.sw_init()
+ result = trapd_get_cbs_config.get_cbs_config()
+ self.assertEqual(result, True)
- # request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
- self.assertEqual(result, True)
+ # request load of CBS data
+ self.assertEqual(snmptrapd.load_all_configs(0, 1), True)
+
+
+ def test_resolve_ip(self):
+ """ Test resolve_ip """
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ time_base = 1000000
+ time_offset = 10000
+ with patch('time.time', return_value=time_base):
+ self.assertEqual(time.time(), time_base)
+
+ fqdn = "foo.example"
+ ip = "1.2.3.4"
+ with patch.dict(tds.dns_cache_ip_to_name):
+ tds.dns_cache_ip_to_name = { }
+ # DOUBLE EXCEPTION - nothing in tds.dns_cache_ip_expires
+ # and gethostbyaddr() fails
+ with patch('socket.gethostbyaddr', side_effect=RuntimeError("gethostbyaddr raises")):
+ self.assertEqual(snmptrapd.resolve_ip(ip), ip)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], ip)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+ tds.dns_cache_ip_to_name = { ip: fqdn }
+ with patch('socket.gethostbyaddr', return_value=(fqdn,fqdn,[ip])):
+ # EXCEPTION - nothing in tds.dns_cache_ip_expires
+ del tds.dns_cache_ip_expires[ip]
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+ with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() - 10000}):
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+
+ with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() + 10000}):
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + time_offset)
- # request load of CBS data
- result = snmptrapd.load_all_configs(0, 1)
- self.assertEqual(result, True)
def test_load_all_configs_signal(self):
"""
@@ -81,78 +254,101 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
- self.assertEqual(result, True)
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ self.assertTrue(trapd_get_cbs_config.get_cbs_config())
+
+ # request load of CBS data
+ self.assertTrue(snmptrapd.load_all_configs(1, 1))
+
+ with patch('snmptrapd.get_cbs_config', return_value=False):
+ with self.assertRaises(SystemExit):
+ snmptrapd.load_all_configs(1, 1)
- # request load of CBS data
- result = snmptrapd.load_all_configs(1, 1)
- self.assertEqual(result, True)
def test_log_all_arriving_traps(self):
"""
Test logging of traps
"""
-
# init vars
tds.init()
- # request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
-
- # set last day to current
- tds.last_day = datetime.datetime.now().day
-
- # trap dict for logging
- tds.trap_dict = {
- "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
- "agent address": "1.2.3.4",
- "agent name": "test-agent.nodomain.com",
- "cambria.partition": "test-agent.nodomain.com",
- "community": "",
- "community len": 0,
- "epoch_serno": 15222068260000,
- "protocol version": "v2c",
- "time received": 1522206826.2938566,
- "trap category": "ONAP-COLLECTOR-SNMPTRAP",
- "sysUptime": "218567736",
- "notify OID": "1.3.6.1.4.1.9999.9.9.999",
- "notify OID len": 10,
- }
+ # don't open files, but try to log - should raise exception
+ with self.assertRaises(Exception) as exc:
+ snmptrapd.log_all_arriving_traps()
+ self.assertIsInstance(exc.exception, TypeError)
- # open eelf logs
- trapd_io.open_eelf_logs()
-
- # open trap logs
- tds.arriving_traps_filename = (
- tds.c_config["files"]["runtime_base_dir"]
- + "/"
- + tds.c_config["files"]["log_dir"]
- + "/"
- + (tds.c_config["files"]["arriving_traps_log"])
- )
- tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename)
-
- # name and open json trap log
- tds.json_traps_filename = (
- tds.c_config["files"]["runtime_base_dir"]
- + "/"
- + tds.c_config["files"]["log_dir"]
- + "/"
- + "DMAAP_"
- + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1])
- + ".json"
- )
- tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename)
- msg = "published traps logged to: %s" % tds.json_traps_filename
- trapd_io.stdout_logger(msg)
- trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+ # request load of CBS data
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ # trap dict for logging
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ result = trapd_get_cbs_config.get_cbs_config()
+
+ # set last day to current
+ tds.last_day = datetime.datetime.now().day
+
+
+ # open eelf logs
+ trapd_io.open_eelf_logs()
+
+ # open trap logs
+ tds.arriving_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"]
+ + "/"
+ + tds.c_config["files"]["log_dir"]
+ + "/"
+ + (tds.c_config["files"]["arriving_traps_log"])
+ )
+ tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename)
+
+ # name and open json trap log
+ tds.json_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"]
+ + "/"
+ + tds.c_config["files"]["log_dir"]
+ + "/"
+ + "DMAAP_"
+ + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1])
+ + ".json"
+ )
+ tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename)
+ msg = "published traps logged to: %s" % tds.json_traps_filename
+ trapd_io.stdout_logger(msg)
+ trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+ # also force it to daily roll
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, but with day rolling
+ tds.last_day = datetime.datetime.now().day - 1
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with roll_frequency set to minute
+ tds.last_minute = datetime.datetime.now().minute - 1
+ tds.c_config["files"]["roll_frequency"] = "minute"
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with roll_frequency set to hour
+ tds.last_hour = datetime.datetime.now().hour - 1
+ tds.c_config["files"]["roll_frequency"] = "hour"
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with a bad trap_dict[time_received]
+ tds.trap_dict["time received"] = "bad_value"
+ snmptrapd.log_all_arriving_traps()
+
+ # also test log_published_messages()
+ snmptrapd.log_published_messages("data #1")
+
+ # work even if there is an exception
+ # this SHOULD be done with a context
+ sv_json_traps_fd = tds.json_traps_fd
+ tds.json_traps_fd = test_snmptrapd.WriteThrows()
+ snmptrapd.log_published_messages("data #2")
+ tds.json_traps_fd = sv_json_traps_fd
- # don't open files, but try to log - should raise exception
- with pytest.raises(Exception) as pytest_wrapped_exception:
- result = snmptrapd.log_all_arriving_traps()
- assert pytest_wrapped_exception.type == AttributeError
def test_log_all_incorrect_log_type(self):
"""
@@ -163,11 +359,14 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- trapd_get_cbs_config.get_cbs_config()
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ trapd_get_cbs_config.get_cbs_config()
+
+ # open eelf logs
+ trapd_io.open_eelf_logs()
- # open eelf logs
- trapd_io.open_eelf_logs()
def test_v1_trap_receipt(self):
"""
@@ -178,26 +377,227 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- trapd_get_cbs_config.get_cbs_config()
-
- errorIndication, errorStatus, errorIndex, varbinds = next(
- sendNotification(
- SnmpEngine(),
- CommunityData("not_public"),
- UdpTransportTarget(("localhost", 6162)),
- ContextData(),
- "trap",
- [
- ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")),
- ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")),
- ],
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ trapd_get_cbs_config.get_cbs_config()
+
+ errorIndication, errorStatus, errorIndex, varbinds = next(
+ sendNotification(
+ SnmpEngine(),
+ CommunityData("not_public"),
+ UdpTransportTarget(("localhost", 6162)),
+ ContextData(),
+ "trap",
+ [
+ ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")),
+ ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")),
+ ],
+ )
)
- )
- result = errorIndication
- self.assertEqual(result, None)
+ result = errorIndication
+ self.assertEqual(result, None)
+
+
+ def test_post_dmaap(self):
+ """
+ Test post_dmaap()
+ """
+
+ # trap dict for logging
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ with patch('snmptrapd.ecomp_logger') as magic_ecomp_logger:
+ with patch('requests.Session.post') as magic_session_post:
+ fake_post_resp = Mock()
+ magic_session_post.return_value = fake_post_resp
+
+ fake_post_resp.status_code = requests.codes.moved_permanently
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 11)
+
+ fake_post_resp.status_code = requests.codes.ok
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 16)
+
+ magic_ecomp_logger.call_count = 0
+ tds.traps_since_last_publish = 1
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 5)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] = "aaf_username"
+ tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_password"] = "aaf_password"
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 5)
+
+ # for some reason this exception is being seen as an OSError ????
+ magic_ecomp_logger.call_count = 0
+ magic_session_post.side_effect = requests.exceptions.RequestException("test throw")
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 10)
+
+ magic_ecomp_logger.call_count = 0
+ magic_session_post.side_effect = OSError()
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 10)
+
+
+ @unittest.skip("do not know what to pass in for vars. Need an object with a clone() method")
+ def test_comm_string_rewrite_observer(self):
+ """
+ test comm_string_rewrite_observer()
+ """
+ vars = { "communityName": ["name"] }
+ snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx")
+ assertEqual(vars["communitName"], "public")
+
+ vars = { "communityName": [] }
+ snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx")
+ assertEqual(vars["communitName"], "")
+
+
+ def test_snmp_engine_observer_cb(self):
+ """
+ test snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx):
+ """
+
+ snmp_engine = "snmp_engine"
+ execpoint = "execpoint"
+ cbCtx = "cbCtx"
+ variables = {
+ 'transportDomain': [ 1, 2, 3 ],
+ 'transportAddress': [ "a", "b", "c" ]
+ }
+ for secmodel,ret in [ (1, "v1"), (2, "v2c"), (3, "v3"), (4, "unknown") ]:
+ variables["securityModel"] = secmodel
+ snmptrapd.snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx)
+ self.assertEqual(tds.trap_dict["protocol version"], ret)
-if __name__ == "__main__":
+ def test_add_varbind_to_log_string(self):
+ """
+ test add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val)
+ """
+ vb_oid = "vb_oid"
+ vb_type = "vb_type"
+
+ class TempPP():
+ def prettyPrint(self):
+ return "pp ret"
+
+ vb_val = TempPP()
+
+ self.assertEqual(tds.all_vb_str, "")
+
+ snmptrapd.add_varbind_to_log_string(0, vb_oid, vb_type, vb_val)
+ self.assertEqual(tds.all_vb_str,
+ 'varbinds: [0] vb_oid {vb_type} pp ret')
+
+ snmptrapd.add_varbind_to_log_string(1, vb_oid, vb_type, vb_val)
+ self.assertEqual(tds.all_vb_str,
+ 'varbinds: [0] vb_oid {vb_type} pp ret [1] vb_oid {vb_type} pp ret')
+
+
+ def test_add_varbind_to_json(self):
+ """
+ test add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val)
+ """
+
+ class TempPP():
+ def __init__(self, ret):
+ self.ret = ret
+ def prettyPrint(self):
+ return self.ret
+
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ vb_oid = TempPP("1.2.3")
+ override_vb_oid = TempPP("1.3.6.1.6.3.18.1.3.0")
+
+ vb_type = "vb_type"
+
+ vb_val = TempPP("foo.example")
+
+ self.assertEqual(snmptrapd.add_varbind_to_json(0, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(snmptrapd.add_varbind_to_json(1, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(tds.trap_dict["notify OID"], ".foo.example")
+ self.assertEqual(tds.trap_dict["notify OID len"], 2)
+
+ with patch('snmptrapd.resolve_ip') as magic_resolve_ip:
+ magic_resolve_ip.return_value = 'foo.example'
+ self.assertEqual(snmptrapd.add_varbind_to_json(2, override_vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(tds.trap_dict["agent address"], "foo.example")
+ self.assertEqual(tds.trap_dict["agent name"], "foo.example")
+
+ sv_protocol_version = tds.trap_dict["protocol version"]
+ tds.trap_dict["protocol version"] = "v1"
+ self.assertEqual(snmptrapd.add_varbind_to_json(4, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(snmptrapd.add_varbind_to_json(5, vb_oid, vb_type, vb_val), 1)
+
+ tds.trap_dict["protocol version"] = sv_protocol_version
+ self.assertEqual(snmptrapd.add_varbind_to_json(6, vb_oid, vb_type, vb_val), 1)
+ self.assertEqual(tds.all_vb_json_str,
+ ', "varbinds": [{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"}')
+
+ self.assertEqual(snmptrapd.add_varbind_to_json(7, vb_oid, vb_type, vb_val), 1)
+ self.assertEqual(tds.all_vb_json_str,
+ ', "varbinds": [{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"}')
+
+
+ @patch('snmptrapd.log_all_arriving_traps')
+ @patch('snmptrapd.post_dmaap', return_value = 0)
+ @patch('snmptrapd.ecomp_logger', return_value = 0)
+ @patch('snmptrapd.add_varbind_to_json', return_value = 1)
+ @patch('snmptrapd.add_varbind_to_log_string', return_value = 0)
+ def test_notif_receiver_cb(self, magic_add_varbind_to_log_string, magic_add_varbind_to_json,
+ magic_ecomp_logger, magic_port_dmaap, magic_lost_all_arriving_traps):
+ """ notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, varBinds, cbCtx) """
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=True):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertFalse(tds.first_trap)
+ self.assertEqual(magic_ecomp_logger.call_count, 8)
+
+ magic_ecomp_logger.call_count = 0
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["publisher"]["max_traps_between_publishes"] = 1
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["publisher"]["max_traps_between_publishes"] = 100
+ tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 0
+ tds.last_pub_time = 0
+ with patch('time.time', return_value=0):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.last_pub_time = 100000
+ tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 1
+ with patch('time.time', return_value=10):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py
index 0e38461..371b308 100644
--- a/tests/test_trapd_exit.py
+++ b/tests/test_trapd_exit.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
@@ -29,27 +28,26 @@ class test_cleanup_and_exit(unittest.TestCase):
def test_normal_exit(self):
"""
- Test normal exit works as expected
+ Test normal exit works as expected, and exits with the 1st arg
"""
- open(pid_file, "w")
+ # create an empty pid file
+ with open(pid_file, "w"):
+ pass
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
+ with self.assertRaises(SystemExit) as exc:
result = trapd_exit.cleanup_and_exit(0, pid_file)
- assert pytest_wrapped_sys_exit.type == SystemExit
- assert pytest_wrapped_sys_exit.value.code == 0
+ self.assertEqual(str(exc.exception), "0")
- # compare = str(result).startswith("SystemExit: 0")
- # self.assertEqual(compare, True)
def test_abnormal_exit(self):
"""
- Test exit with missing PID file exits non-zero
+ Test exit with missing PID file. Still exits with the 1st arg.
"""
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
+
+ with self.assertRaises(SystemExit) as exc:
result = trapd_exit.cleanup_and_exit(0, pid_file_dne)
- assert pytest_wrapped_sys_exit.type == SystemExit
- assert pytest_wrapped_sys_exit.value.code == 1
+ self.assertEqual(str(exc.exception), "0")
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py
index b7c5d7b..4064b78 100644
--- a/tests/test_trapd_get_cbs_config.py
+++ b/tests/test_trapd_get_cbs_config.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,101 +14,198 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
-import unittest
+import json
import os
import sys
+import unittest
+from unittest.mock import patch
+from pathlib import Path
-from onap_dcae_cbs_docker_client.client import get_config
-from trapd_exit import cleanup_and_exit
-from trapd_io import stdout_logger, ecomp_logger
-import trapd_settings as tds
import trapd_get_cbs_config
-from pathlib import Path
-# # # # # # # #
-# ENV setup
-# # # # # # # #
-
-# required directory tree
-try:
- Path("/tmp/opt/app/snmptrap/logs").mkdir(parents=True, exist_ok=True)
- Path("/tmp/opt/app/snmptrap/tmp").mkdir(parents=True, exist_ok=True)
-except Exception as e:
- print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
- sys.exit(1)
-
-# env var for CBS_SIM_JSON
-try:
- os.environ["CBS_SIM_JSON"] = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
-except Exception as e:
- print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
- sys.exit(1)
-
-pytest_json_data = '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }'
-
-# create snmptrapd.json for pytest
-pytest_json_config = os.getenv("CBS_SIM_JSON")
-with open(pytest_json_config, "w") as outfile:
- outfile.write(pytest_json_data)
-outfile.close()
-
-# test class/methods
-class test_get_cbs_config(unittest.TestCase):
+class test_trapd_get_cbs_config(unittest.TestCase):
"""
Test the trapd_get_cbs_config mod
"""
+ snmptrap_dir = "/tmp/opt/app/snmptrap"
+ json_dir = snmptrap_dir + "/etc"
+
+ # fmt: off
+ pytest_json_data = json.loads(
+ '{'
+ '"snmptrapd": { '
+ ' "version": "1.4.0", '
+ ' "title": "ONAP SNMP Trap Receiver" }, '
+ '"protocols": { '
+ ' "transport": "udp", '
+ ' "ipv4_interface": "0.0.0.0", '
+ ' "ipv4_port": 6162, '
+ ' "ipv6_interface": "::1", '
+ ' "ipv6_port": 6162 }, '
+ '"cache": { '
+ ' "dns_cache_ttl_seconds": 60 }, '
+ '"publisher": { '
+ ' "http_timeout_milliseconds": 1500, '
+ ' "http_retries": 3, '
+ ' "http_milliseconds_between_retries": 750, '
+ ' "http_primary_publisher": "true", '
+ ' "http_peer_publisher": "unavailable", '
+ ' "max_traps_between_publishes": 10, '
+ ' "max_milliseconds_between_publishes": 10000 }, '
+ '"streams_publishes": { '
+ ' "sec_fault_unsecure": { '
+ ' "type": "message_router", '
+ ' "aaf_password": null, '
+ ' "dmaap_info": { '
+ ' "location": "mtl5", '
+ ' "client_id": null, '
+ ' "client_role": null, '
+ ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, '
+ ' "aaf_username": null } }, '
+ '"files": { '
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap", '
+ ' "log_dir": "logs", '
+ ' "data_dir": "data", '
+ ' "pid_dir": "tmp", '
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log", '
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log", '
+ ' "traps_stats_log": "snmptrapd_stats.csv", '
+ ' "perm_status_file": "snmptrapd_status.log", '
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", '
+ ' "eelf_error": "error.log", '
+ ' "eelf_debug": "debug.log", '
+ ' "eelf_audit": "audit.log", '
+ ' "eelf_metrics": "metrics.log", '
+ ' "roll_frequency": "day", '
+ ' "minimum_severity_to_log": 2 }, '
+ '"trap_config": { '
+ ' "sw_interval_in_seconds": 60, '
+ ' "notify_oids": { '
+ ' ".1.3.6.1.4.1.9.0.1": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.2": { '
+ ' "sw_high_water_in_interval": 101, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.3": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.4": { '
+ ' "sw_high_water_in_interval": 10, '
+ ' "sw_low_water_in_interval": 3, '
+ ' "category": "logonly" } } }, '
+ '"snmpv3_config": { '
+ ' "usm_users": [ { '
+ ' "user": "usr-sha-aes256", '
+ ' "engineId": "8000000001020304", '
+ ' "usmHMACSHAAuth": "authkey1", '
+ ' "usmAesCfb256": "privkey1" }, '
+ ' { "user": "user1", '
+ ' "engineId": "8000000000000001", '
+ ' "usmHMACMD5Auth": "authkey1", '
+ ' "usmDESPriv": "privkey1" }, '
+ ' { "user": "user2", '
+ ' "engineId": "8000000000000002", '
+ ' "usmHMACSHAAuth": "authkey2", '
+ ' "usmAesCfb128": "privkey2" }, '
+ ' { "user": "user3", '
+ ' "engineId": "8000000000000003", '
+ ' "usmHMACSHAAuth": "authkey3", '
+ ' "usmAesCfb256": "privkey3" } '
+ '] } }'
+ )
+ # fmt: on
+
+
+ @classmethod
+ def setUpClass(cls):
+ """ set up the required directory tree """
+ try:
+ Path(test_trapd_get_cbs_config.snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True)
+ Path(test_trapd_get_cbs_config.snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True)
+ Path(test_trapd_get_cbs_config.snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True)
+ except Exception as e:
+ print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+ sys.exit(1)
+
+
+ def write_config(self, filename, config):
+ """
+ write a config file
+ """
+ # create snmptrapd.json for pytest
+ with open(filename, "w") as outfile:
+ json.dump(config, outfile)
+
+
+ @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/snmptrapd.json"})
def test_cbs_fallback_env_present(self):
"""
Test that CBS fallback env variable exists and we can get config
from fallback env var
"""
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
- print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, True)
- self.assertEqual(result, True)
+ assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/snmptrapd.json"
+ self.write_config(test_trapd_get_cbs_config.json_dir + "/snmptrapd.json", test_trapd_get_cbs_config.pytest_json_data)
+
+ self.assertTrue(trapd_get_cbs_config.get_cbs_config())
+
+ @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/snmptrapd.json"})
+ def test_cbs_fallback_env_present_bad_numbers(self):
+ """
+ Test as in test_cbs_fallback_env_present(), but with
+ various values reset to be non-numeric.
+ """
+ assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/snmptrapd.json"
+ with patch.dict(test_trapd_get_cbs_config.pytest_json_data):
+ test_trapd_get_cbs_config.pytest_json_data["publisher"]["http_milliseconds_between_retries"] = "notanumber"
+ test_trapd_get_cbs_config.pytest_json_data["files"]["minimum_severity_to_log"] = "notanumber"
+ test_trapd_get_cbs_config.pytest_json_data["publisher"]["http_retries"] = "notanumber"
+ self.write_config(test_trapd_get_cbs_config.json_dir + "/snmptrapd.json",
+ test_trapd_get_cbs_config.pytest_json_data)
+
+ self.assertTrue(trapd_get_cbs_config.get_cbs_config())
+
+
+ @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/nosuchfile.json"})
def test_cbs_override_env_invalid(self):
""" """
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/nosuchfile.json")
- # result = trapd_get_cbs_config.get_cbs_config()
- # print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, False)
+ assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/nosuchfile.json"
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
+ with self.assertRaises(SystemExit) as exc:
result = trapd_get_cbs_config.get_cbs_config()
- assert pytest_wrapped_sys_exit.type == SystemExit
- # assert pytest_wrapped_sys_exit.value.code == 1
+ self.assertEqual(str(exc.exception), "1")
+
+ @patch.dict(os.environ, {"CONSUL_HOST": "localhost"})
def test_cbs_env_present(self):
"""
Test that CONSUL_HOST env variable exists but fails to
respond
"""
- os.environ.update(CONSUL_HOST="localhost")
+ self.assertEqual(os.getenv("CONSUL_HOST"), "localhost")
+
del os.environ["CBS_SIM_JSON"]
- # result = trapd_get_cbs_config.get_cbs_config()
- # print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, False)
+ self.assertNotIn("CBS_SIM_JSON", os.environ)
- with pytest.raises(SystemExit) as sys_exit:
+ with self.assertRaises(SystemExit) as exc:
trapd_get_cbs_config.get_cbs_config()
- assert sys_exit.value.errno == errno.ECONNREFUSED
+
+ @patch.dict(os.environ, {})
def test_cbs_override_env_undefined(self):
""" """
- print("------>>> RUNNING test_no_cbs_override_env_var:")
del os.environ["CBS_SIM_JSON"]
+ self.assertNotIn("CBS_SIM_JSON", os.environ)
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
- assert trapd_get_cbs_config.get_cbs_config() == SystemExit
+ with self.assertRaises(SystemExit) as exc:
+ trapd_get_cbs_config.get_cbs_config()
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py
index 7ec89a6..667a454 100644
--- a/tests/test_trapd_http_session.py
+++ b/tests/test_trapd_http_session.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,9 +14,11 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_http_session
+import requests
+from unittest.mock import Mock, patch
+import trapd_settings as tds
class test_init_session_obj(unittest.TestCase):
@@ -24,39 +26,62 @@ class test_init_session_obj(unittest.TestCase):
Test the init_session_obj mod
"""
- def close_nonexisting_session(self):
+ @classmethod
+ def setUpClass(cls):
+ tds.init()
+
+ def test_init_session_obj(self):
"""
- test close of existing http session
+ test creation of http session
"""
- sess = "no session"
- result = trapd_http_session.close_session_obj(sess)
- self.assertEqual(result, True)
+ self.assertIsInstance(trapd_http_session.init_session_obj(), requests.sessions.Session)
+
- def init_session(self):
+ def test_init_session_obj_raises(self):
"""
- test creation of http session
+ test close when the requests.Session() method throws an exception
+ """
+ with patch('requests.Session') as magic_requests:
+ magic_requests.side_effect = RuntimeError("Session() throws via mock")
+ with self.assertRaises(SystemExit) as exc:
+ trapd_http_session.init_session_obj()
+ self.assertEqual(str(exc.exception), "1")
+
+
+ def test_close_nonexisting_session(self):
"""
- result = trapd_http_session.init_session_obj()
- compare = str(result).startswith("<requests.sessions.Session object at")
- self.assertEqual(compare, True)
+ test close of non-existing http session
+ """
+ self.assertIsNone(trapd_http_session.close_session_obj(None))
+
+
+ def test_close_nonexisting_close_raises(self):
+ """
+ test close when the session.close() method throws
+ """
+ class CloseThrows():
+ def close(self):
+ raise RuntimeError("close() throws")
+
+ with self.assertRaises(SystemExit):
+ trapd_http_session.close_session_obj(CloseThrows())
+
def test_reset(self):
"""
test reset of existing http session
"""
sess = trapd_http_session.init_session_obj()
- result = trapd_http_session.reset_session_obj(sess)
- compare = str(result).startswith("<requests.sessions.Session object at")
- self.assertEqual(compare, True)
+ self.assertIsInstance(trapd_http_session.reset_session_obj(sess), requests.sessions.Session)
+
- def close_existing_session(self):
+ def test_close_existing_session(self):
"""
test close of existing http session
"""
sess = trapd_http_session.init_session_obj()
- result = trapd_http_session.close_session_obj(sess)
- self.assertEqual(result, True)
+ self.assertTrue(trapd_http_session.close_session_obj(sess))
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_io.py b/tests/test_trapd_io.py
index c1702aa..f3d1a4c 100644
--- a/tests/test_trapd_io.py
+++ b/tests/test_trapd_io.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,17 +14,21 @@
# limitations under the License.
# ============LICENSE_END=========================================================
+import datetime
+import glob
+import io
+import json
import os
-import pytest
+from pathlib import Path
+import sys
+import tempfile
import unittest
+from unittest.mock import patch
+
import snmptrapd
-import datetime
-import json
import trapd_settings as tds
import trapd_runtime_pid
import trapd_io
-import sys
-from pathlib import Path
class test_trapd_io(unittest.TestCase):
@@ -32,32 +36,141 @@ class test_trapd_io(unittest.TestCase):
Test the save_pid mod
"""
- tds.c_config = json.loads(
- '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }'
- )
+ class PseudoFile():
+ """ test file-like object that does nothing """
+ def write(self):
+ pass
+ def close(self):
+ pass
- def test_roll_all_files_notopen(self):
- """
- Test rolling files that aren't open
- """
+ class WriteThrows():
+ """ test file-like object that throws on a write """
+ def write(self):
+ raise RuntimeError("close() throws")
+
+
+ @classmethod
+ def setUpClass(cls):
+
+ tds.init()
+
+ snmptrap_dir = "/tmp/opt/app/snmptrap"
+ try:
+ Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True)
+ except Exception as e:
+ print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+ sys.exit(1)
+
+ # fmt: off
+ tds.c_config = json.loads(
+ '{ "snmptrapd": { '
+ ' "version": "1.4.0", '
+ ' "title": "ONAP SNMP Trap Receiver" }, '
+ '"protocols": { '
+ ' "transport": "udp", '
+ ' "ipv4_interface": "0.0.0.0", '
+ ' "ipv4_port": 6162, '
+ ' "ipv6_interface": "::1", '
+ ' "ipv6_port": 6162 }, '
+ '"cache": { '
+ ' "dns_cache_ttl_seconds": 60 }, '
+ '"publisher": { '
+ ' "http_timeout_milliseconds": 1500, '
+ ' "http_retries": 3, '
+ ' "http_milliseconds_between_retries": 750, '
+ ' "http_primary_publisher": "true", '
+ ' "http_peer_publisher": "unavailable", '
+ ' "max_traps_between_publishes": 10, '
+ ' "max_milliseconds_between_publishes": 10000 }, '
+ '"streams_publishes": { '
+ ' "sec_fault_unsecure": { '
+ ' "type": "message_router", '
+ ' "aaf_password": null, '
+ ' "dmaap_info": { '
+ ' "location": "mtl5", '
+ ' "client_id": null, '
+ ' "client_role": null, '
+ ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, '
+ ' "aaf_username": null } }, '
+ '"files": { '
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap", '
+ ' "log_dir": "logs", '
+ ' "data_dir": "data", '
+ ' "pid_dir": "tmp", '
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log", '
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log", '
+ ' "traps_stats_log": "snmptrapd_stats.csv", '
+ ' "perm_status_file": "snmptrapd_status.log", '
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", '
+ ' "eelf_error": "error.log", '
+ ' "eelf_debug": "debug.log", '
+ ' "eelf_audit": "audit.log", '
+ ' "eelf_metrics": "metrics.log", '
+ ' "roll_frequency": "day", '
+ ' "minimum_severity_to_log": 2 }, '
+ '"trap_config": { '
+ ' "sw_interval_in_seconds": 60, '
+ ' "notify_oids": { '
+ ' ".1.3.6.1.4.1.9.0.1": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.2": { '
+ ' "sw_high_water_in_interval": 101, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.3": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.4": { '
+ ' "sw_high_water_in_interval": 10, '
+ ' "sw_low_water_in_interval": 3, '
+ ' "category": "logonly" } } }, '
+ '"snmpv3_config": { '
+ ' "usm_users": [ { '
+ ' "user": "usr-sha-aes256", '
+ ' "engineId": "8000000001020304", '
+ ' "usmHMACSHAAuth": "authkey1", '
+ ' "usmAesCfb256": "privkey1" }, '
+ ' { "user": "user1", '
+ ' "engineId": "8000000000000001", '
+ ' "usmHMACMD5Auth": "authkey1", '
+ ' "usmDESPriv": "privkey1" }, '
+ ' { "user": "user2", '
+ ' "engineId": "8000000000000002", '
+ ' "usmHMACSHAAuth": "authkey2", '
+ ' "usmAesCfb128": "privkey2" }, '
+ ' { "user": "user3", '
+ ' "engineId": "8000000000000003", '
+ ' "usmHMACSHAAuth": "authkey3", '
+ ' "usmAesCfb256": "privkey3" } '
+ '] } }'
+ )
+ # fmt: on
+ tds.json_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"] + "/json_traps.json"
+ )
+ tds.arriving_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"] + "/arriving_traps.log"
+ )
- # try to roll logs when not open
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.roll_all_logs()
- assert pytest_wrapped_exception.type == SystemExit
def test_open_eelf_error_file(self):
"""
Test bad error file location
"""
- # open eelf error logs
- tds.c_config["files.eelf_error"] = "/bad_dir/error.log"
+ with patch.dict(tds.c_config["files"]):
+ # open eelf error logs
+ tds.c_config["files"]["eelf_error"] = "/bad_dir/error.log"
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
- # try to open file in non-existent dir
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_eelf_logs()
- assert pytest_wrapped_exception.type == SystemExit
def test_open_eelf_debug_file(self):
"""
@@ -65,86 +178,248 @@ class test_trapd_io(unittest.TestCase):
"""
# open eelf debug logs
- tds.c_config["files.eelf_debug"] = "/bad_dir/debug.log"
+ with patch.dict(tds.c_config["files"]):
+ tds.c_config["files"]["eelf_debug"] = "/bad_dir/debug.log"
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
- # try to open file in non-existent dir
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_eelf_logs()
- assert pytest_wrapped_exception.type == SystemExit
def test_open_eelf_audit_file(self):
"""
Test bad audit file location
"""
- # open eelf debug logs
- tds.c_config["files.eelf_audit"] = "/bad_dir/audit.log"
+ with patch.dict(tds.c_config["files"]):
+ # open eelf debug logs
+ tds.c_config["files"]["eelf_audit"] = "/bad_dir/audit.log"
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
- # try to open file in non-existent dir
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_eelf_logs()
- assert pytest_wrapped_exception.type == SystemExit
def test_open_eelf_metrics_file(self):
"""
Test bad metrics file location
"""
+ with patch.dict(tds.c_config["files"]):
+ # open eelf debug logs
+ tds.c_config["files"]["eelf_metrics"] = "/bad_dir/metrics.log"
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
+
+
+ def test_open_eelf_error_file_missing_name(self):
+ """
+ Test bad error file location
+ """
+
+ with patch.dict(tds.c_config["files"]):
+ # open eelf error logs
+ del tds.c_config["files"]["eelf_error"]
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
+
+
+ def test_open_eelf_debug_file_missing_name(self):
+ """
+ Test bad debug file location
+ """
+
# open eelf debug logs
- tds.c_config["files.eelf_metrics"] = "/bad_dir/metrics.log"
+ with patch.dict(tds.c_config["files"]):
+ del tds.c_config["files"]["eelf_debug"]
- # try to open file in non-existent dir
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_eelf_logs()
- assert pytest_wrapped_exception.type == SystemExit
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
- def test_roll_all_logs(self):
+
+ def test_open_eelf_audit_file_missing_name(self):
+ """
+ Test bad audit file location
+ """
+
+ with patch.dict(tds.c_config["files"]):
+ # open eelf debug logs
+ del tds.c_config["files"]["eelf_audit"]
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
+
+
+ def test_open_eelf_metrics_file_missing_name(self):
+ """
+ Test bad metrics file location
+ """
+
+ with patch.dict(tds.c_config["files"]):
+ # open eelf debug logs
+ del tds.c_config["files"]["eelf_metrics"]
+
+ # try to open file in non-existent dir
+ with self.assertRaises(SystemExit):
+ result = trapd_io.open_eelf_logs()
+
+
+ def test_roll_all_logs_not_open(self):
"""
Test roll of logs when not open
"""
- # try to roll logs when not open
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.roll_all_logs()
- assert pytest_wrapped_exception.type == SystemExit
+ # try to roll logs when not open. Shouldn't fail
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+ def test_roll_all_logs(self):
+ """
+ Test rolling files that they are open
+ """
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+
+ def test_roll_all_logs_roll_file_throws(self):
+ """
+ Test rolling files that they are open
+ but roll_file throws an exception
+ """
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ with patch('trapd_io.roll_file') as roll_file_throws:
+ roll_file_throws.side_effect = RuntimeError("roll_file() throws")
+ with self.assertRaises(SystemExit):
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+
+ def test_roll_all_logs_open_eelf_logs_returns_false(self):
+ """
+ Test rolling files that they are open
+ but open_eelf_logs returns false
+ """
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ with patch('trapd_io.open_eelf_logs') as open_eelf_logs_throws:
+ open_eelf_logs_throws.return_value = False
+ with self.assertRaises(SystemExit):
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+
+ def test_roll_all_logs_open_file_json_traps_throws(self):
+ """
+ Test rolling files that they are open
+ but open_file(json_traps_filename) throws an exception
+ """
+
+ def tmp_func(nm):
+ if nm == tds.json_traps_filename:
+ raise RuntimeError("json_traps_filename throws")
+ return test_trapd_io.PseudoFile()
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ with patch('trapd_io.open_file') as open_file_throws:
+ open_file_throws.side_effect = tmp_func
+ with self.assertRaises(SystemExit):
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
+
+ def test_roll_all_logs_open_file_arriving_traps_throws(self):
+ """
+ Test rolling files that they are open
+ but open_file(arriving_traps_filename) throws an exception
+ """
+
+ def tmp_func(nm):
+ if nm == tds.arriving_traps_filename:
+ raise RuntimeError("arriving_traps_filename throws")
+ return test_trapd_io.PseudoFile()
+
+ trapd_io.open_eelf_logs()
+ # try to roll logs
+ with patch('trapd_io.open_file') as open_file_throws:
+ open_file_throws.side_effect = tmp_func
+ with self.assertRaises(SystemExit):
+ trapd_io.roll_all_logs()
+ self.assertIsNotNone(tds.eelf_error_fd)
+
def test_roll_file(self):
"""
Test roll of individual file when not present
"""
+ # try to roll a valid log file
+ with tempfile.TemporaryDirectory() as ntd:
+ fn = ntd + "/test.log"
+ with open(fn, "w") as ofp:
+ self.assertTrue(trapd_io.roll_file(fn))
+ # The file will be renamed to something like
+ # test.log.2022-08-17T20:28:32
+ self.assertFalse(os.path.exists(fn))
+ # We could also add a test to see if there is a file
+ # with a name like that.
+ files = list(glob.glob(f"{ntd}/*"))
+ print(f"files={files}")
+ self.assertEqual(len(files), 1)
+ self.assertTrue(files[0].startswith(fn + "."))
+
+
+ def test_roll_file_not_present(self):
+ """
+ Test roll of individual file when not present
+ """
+
# try to roll logs when not open
- result = trapd_io.roll_file("/file/not/present")
- self.assertEqual(result, False)
+ self.assertFalse(trapd_io.roll_file("/file/not/present"))
+
def test_roll_file_no_write_perms(self):
"""
try to roll logs when not enough perms
"""
- no_perms_dir = "/tmp/opt/app/snmptrap/no_perms"
- no_perms_file = "test.dat"
- no_perms_fp = no_perms_dir + "/" + no_perms_file
+ with tempfile.TemporaryDirectory() as no_perms_dir:
+ # no_perms_dir = "/tmp/opt/app/snmptrap/no_perms"
+ no_perms_file = "test.dat"
+ no_perms_fp = no_perms_dir + "/" + no_perms_file
- # required directory tree
- try:
- Path(no_perms_dir).mkdir(parents=True, exist_ok=True)
- os.chmod(no_perms_dir, 0o777)
- except Exception as e:
- print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
- sys.exit(1)
+ # required directory tree
+ #try:
+ # Path(no_perms_dir).mkdir(parents=True, exist_ok=True)
+ # os.chmod(no_perms_dir, 0o700)
+ #except Exception as e:
+ # self.fail("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+
+ # create empty file
+ open(no_perms_fp, "w").close()
+ os.chmod(no_perms_dir, 0o555)
- # create empty file
- open(no_perms_fp, "a").close()
- os.chmod(no_perms_dir, 0o444)
+ # try to roll file in dir with no write perms
+ self.assertFalse(trapd_io.roll_file(no_perms_fp))
- result = trapd_io.roll_file(no_perms_fp)
- self.assertEqual(result, False)
+ # the file should still be there
+ open(no_perms_fp).close()
+
+ # allow the directory to be removed
+ os.chmod(no_perms_dir, 0o700)
- # try to roll file in dir with no write perms
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
- result = trapd_io.open_file(no_perms_fp)
- assert pytest_wrapped_exception.type == SystemExit
def test_open_file_exists(self):
"""
@@ -156,8 +431,9 @@ class test_trapd_io(unittest.TestCase):
# try to roll logs when not open
result = trapd_io.open_file(test_file)
- compare = str(result).startswith("<_io.TextIOWrapper name=")
- self.assertEqual(compare, True)
+ self.assertTrue(str(result).startswith("<_io.TextIOWrapper name="))
+ self.assertIsInstance(result, io.TextIOWrapper)
+
def test_open_file_exists_does_not_exist(self):
"""
@@ -168,9 +444,9 @@ class test_trapd_io(unittest.TestCase):
test_file = "/tmp/no_such_dir/snmptrap_pytest"
# try to open file when dir not present
- with pytest.raises(SystemExit) as pytest_wrapped_exception:
+ with self.assertRaises(SystemExit):
result = trapd_io.open_file(test_file)
- assert pytest_wrapped_exception.type == SystemExit
+
def test_close_file_exists(self):
"""
@@ -182,18 +458,113 @@ class test_trapd_io(unittest.TestCase):
test_file = trapd_io.open_file(test_file_name)
# close active file
- result = trapd_io.close_file(test_file, test_file_name)
- self.assertEqual(result, True)
+ self.assertTrue(trapd_io.close_file(test_file, test_file_name))
+
- def test_close_file_does_not_exists(self):
+ def test_close_file_does_not_exist(self):
"""
Test closing non-existent file
"""
# try to roll logs when not open
- result = trapd_io.close_file(None, None)
- self.assertEqual(result, False)
+ self.assertFalse(trapd_io.close_file(None, None))
+
+
+ def test_ecomp_logger_type_error(self):
+ """
+ test trapd_io.ecomp_logger
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+
+
+ def test_ecomp_logger_type_error_bad_fd(self):
+ """
+ test trapd_io.ecomp_logger, but write() throws
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ # the following SHOULD be done with a context patch
+ sv_eelf_error_fd = tds.eelf_error_fd
+ tds.eelf_error_fd = test_trapd_io.WriteThrows()
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+ tds.eelf_error_fd = sv_eelf_error_fd
+
+
+ def test_ecomp_logger_type_unknown_bad_fd(self):
+ """
+ test trapd_io.ecomp_logger, unknown type, but write() throws
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ # the following SHOULD be done with a context patch
+ sv_eelf_error_fd = tds.eelf_error_fd
+ tds.eelf_error_fd = test_trapd_io.WriteThrows()
+ self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+ tds.eelf_error_fd = sv_eelf_error_fd
+
+
+ def test_ecomp_logger_type_metrics(self):
+ """
+ test trapd_io.ecomp_logger to metrics
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+
+
+ def test_ecomp_logger_type_metrics_bad_fd(self):
+ """
+ test trapd_io.ecomp_logger to metrics, but write() throws
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ # the following SHOULD be done with a context patch
+ sv_eelf_metrics_fd = tds.eelf_metrics_fd
+ tds.eelf_metrics_fd = test_trapd_io.WriteThrows()
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+ tds.eelf_metrics_fd = sv_eelf_metrics_fd
+
+
+ def test_ecomp_logger_type_audit(self):
+ """
+ test trapd_io.ecomp_logger to audit log
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+
+
+ def test_ecomp_logger_type_audit_bad_fd(self):
+ """
+ test trapd_io.ecomp_logger to audit log, but write() throws
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ # the following SHOULD be done with a context patch
+ sv_eelf_audit_fd = tds.eelf_audit_fd
+ tds.eelf_audit_fd = test_trapd_io.WriteThrows()
+ self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
+ tds.eelf_audit_fd = sv_eelf_audit_fd
+
+
+ def test_ecomp_logger_type_unknown(self):
+ """
+ test trapd_io.ecomp_logger
+ """
+
+ trapd_io.open_eelf_logs()
+ msg = "this is a test"
+ self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_runtime_pid.py b/tests/test_trapd_runtime_pid.py
index c6b8601..923f730 100644
--- a/tests/test_trapd_runtime_pid.py
+++ b/tests/test_trapd_runtime_pid.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,24 +14,30 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
+import os
import unittest
-import trapd_runtime_pid
+from unittest.mock import patch
+
import trapd_io
+import trapd_runtime_pid
-class test_save_pid(unittest.TestCase):
+class test_trapd_runtime_pid(unittest.TestCase):
"""
Test the save_pid mod
"""
+ GOOD_PID_FILE = "/tmp/snmptrap_test_pid_file"
+ BAD_PID_FILE = "/tmp/snmptrap_test_pid_file_not_there"
+
def test_correct_usage(self):
"""
Test that attempt to create pid file in standard location works
"""
- result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file")
+ result = trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE)
self.assertEqual(result, True)
+
def test_missing_directory(self):
"""
Test that attempt to create pid file in missing dir fails
@@ -40,7 +46,6 @@ class test_save_pid(unittest.TestCase):
self.assertEqual(result, False)
-class test_rm_pid(unittest.TestCase):
"""
Test the rm_pid mod
"""
@@ -50,18 +55,26 @@ class test_rm_pid(unittest.TestCase):
Test that attempt to remove pid file in standard location works
"""
# must create it before removing it
- result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file")
- self.assertEqual(result, True)
- result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file")
- self.assertEqual(result, True)
+ self.assertTrue(trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE))
+ self.assertTrue(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.GOOD_PID_FILE))
+
def test_missing_file(self):
"""
Test that attempt to rm non-existent pid file fails
"""
- result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file_9999")
- self.assertEqual(result, False)
+ self.assertFalse(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.BAD_PID_FILE))
+
+
+ def test_correct_usage_but_throws(self):
+ """
+ Test that an exception while removing returns false
+ """
+ self.assertTrue(trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE))
+ with patch('os.remove') as mock_remove:
+ mock_remove.side_effect = RuntimeError("os.remove throws")
+ self.assertFalse(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.GOOD_PID_FILE))
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_settings.py b/tests/test_trapd_settings.py
index 92a3144..5625d78 100644
--- a/tests/test_trapd_settings.py
+++ b/tests/test_trapd_settings.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
@@ -24,66 +23,43 @@ pid_file_dne = "/tmp/test_pid_file_NOT"
import trapd_settings as tds
-class test_cleanup_and_exit(unittest.TestCase):
+class test_trapd_settings(unittest.TestCase):
"""
Test for presense of required vars
"""
+ @classmethod
+ def setUpClass(cls):
+ tds.init()
+
+
def test_nonexistent_dict(self):
"""
Test nosuch var
"""
- tds.init()
- try:
- tds.no_such_var
- result = True
- except:
- result = False
+ self.assertFalse(hasattr(tds, 'no_such_var'))
- self.assertEqual(result, False)
def test_config_dict(self):
"""
Test config dict
"""
- tds.init()
- try:
- tds.c_config
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(tds, 'c_config'))
- self.assertEqual(result, True)
def test_dns_cache_ip_to_name(self):
"""
Test dns cache name dict
"""
+ self.assertTrue(hasattr(tds, 'dns_cache_ip_to_name'))
- tds.init()
- try:
- tds.dns_cache_ip_to_name
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
def test_dns_cache_ip_expires(self):
"""
Test dns cache ip expires dict
"""
-
- tds.init()
- try:
- tds.dns_cache_ip_expires
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
+ self.assertTrue(hasattr(tds, 'dns_cache_ip_expires'))
-if __name__ == "__main__":
- # tds.init()
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_snmpv3.py b/tests/test_trapd_snmpv3.py
index eac6082..2011953 100644
--- a/tests/test_trapd_snmpv3.py
+++ b/tests/test_trapd_snmpv3.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,10 +14,10 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import json
import unittest
import os
+import traceback
from onap_dcae_cbs_docker_client.client import get_config
from trapd_exit import cleanup_and_exit
@@ -33,48 +33,459 @@ class test_snmpv3_config(unittest.TestCase):
Test snmpv3 module
"""
+ JSON_START = (
+ '{'
+ ' "snmptrapd": {'
+ ' "version": "2.0",'
+ ' "title": "ONAP SNMP Trap Receiver"'
+ ' },'
+ ' "protocols": {'
+ ' "transport": "udp",'
+ ' "ipv4_interface": "0.0.0.0",'
+ ' "ipv4_port": 6162,'
+ ' "ipv6_interface": "::1",'
+ ' "ipv6_port": 6162'
+ ' },'
+ ' "cache": {'
+ ' "dns_cache_ttl_seconds": 10800'
+ ' },'
+ ' "publisher": {'
+ ' "http_milliseconds_timeout": 500,'
+ ' "http_retries": 2,'
+ ' "http_milliseconds_between_retries": 250,'
+ ' "http_primary_publisher": "true",'
+ ' "http_peer_publisher": "unavailable",'
+ ' "max_traps_between_publishes": 10,'
+ ' "max_milliseconds_between_publishes": 10000'
+ ' },'
+ ' "streams_publishes": {'
+ ' "sec_fault_unsecure": {'
+ ' "type": "message_router",'
+ ' "aaf_password": null,'
+ ' "dmaap_info": {'
+ ' "location": "mtl5",'
+ ' "client_id": null,'
+ ' "client_role": null,'
+ ' "topic_url": "http://localhost:3904/events/unauthenticated.ONAP-COLLECTOR-SNMPTRAP"'
+ ' },'
+ ' "aaf_username": null'
+ ' }'
+ ' },'
+ ' "files": {'
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap",'
+ ' "log_dir": "logs",'
+ ' "data_dir": "data",'
+ ' "pid_dir": "tmp",'
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log",'
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log",'
+ ' "traps_stats_log": "snmptrapd_stats.csv",'
+ ' "perm_status_file": "snmptrapd_status.log",'
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs",'
+ ' "eelf_error": "error.log",'
+ ' "eelf_debug": "debug.log",'
+ ' "eelf_audit": "audit.log",'
+ ' "eelf_metrics": "metrics.log",'
+ ' "roll_frequency": "hour",'
+ ' "minimum_severity_to_log": 3'
+ ' },'
+ ' "check_hb_traps": {'
+ ' "trap_thr": 900,'
+ ' "hb_thr": 900,'
+ ' "hb_notify_oid": ".1.3.6.1.4.1.74.2.46.12.1.1"'
+ ' },'
+ ' "trap_config": {'
+ ' "sw_interval_in_seconds": 60,'
+ ' "metric_log_notification_threshold_pct": 25,'
+ ' "notify_oids": ['
+ ' {'
+ ' "oid": ".1.3.6.1.4.1.9.0.1",'
+ ' "sw_high_water_in_interval": 100,'
+ ' "sw_low_water_in_interval": 5,'
+ ' "category": "logonly"'
+ ' },'
+ ' {'
+ ' "oid": ".1.3.6.1.4.1.9.0.2",'
+ ' "sw_high_water_in_interval": 200,'
+ ' "sw_low_water_in_interval": 10,'
+ ' "category": "logonly"'
+ ' },'
+ ' {'
+ ' "oid": ".1.3.6.1.4.1.9.0.3",'
+ ' "sw_high_water_in_interval": 300,'
+ ' "sw_low_water_in_interval": 15,'
+ ' "category": "logonly"'
+ ' }'
+ ' ]'
+ ' }'
+ )
+ JSON_USERS = (
+ ' "snmpv3_config": {'
+ ' "usm_users": ['
+ ' {'
+ ' "user": "user1",'
+ ' "engineId": "8000000000000001",'
+ ' "usmHMACMD5AuthProtocol": "authkey1",'
+ ' "usmDESPrivProtocol": "privkey1"'
+ ' },'
+ ' {'
+ ' "user": "user2",'
+ ' "engineId": "8000000000000002",'
+ ' "usmHMACMD5AuthProtocol": "authkey2",'
+ ' "usm3DESEDEPrivProtocol": "privkey2"'
+ ' },'
+ ' {'
+ ' "user": "user3",'
+ ' "engineId": "8000000000000003",'
+ ' "usmHMACMD5AuthProtocol": "authkey3",'
+ ' "usmAesCfb128Protocol": "privkey3"'
+ ' },'
+ ' {'
+ ' "user": "user4",'
+ ' "engineId": "8000000000000004",'
+ ' "usmHMACMD5AuthProtocol": "authkey4",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey4"'
+ ' },'
+ ' {'
+ ' "user": "user5",'
+ ' "engineId": "8000000000000005",'
+ ' "usmHMACMD5AuthProtocol": "authkey5",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey5"'
+ ' },'
+ ' {'
+ ' "user": "user6",'
+ ' "engineId": "8000000000000006",'
+ ' "usmHMACMD5AuthProtocol": "authkey6",'
+ ' "usmAesCfb192Protocol": "privkey6"'
+ ' },'
+ ' {'
+ ' "user": "user7",'
+ ' "engineId": "8000000000000007",'
+ ' "usmHMACMD5AuthProtocol": "authkey7",'
+ ' "usmAesCfb256Protocol": "privkey7"'
+ ' },'
+ ' {'
+ ' "user": "user9",'
+ ' "engineId": "8000000000000009",'
+ ' "usmHMACSHAAuthProtocol": "authkey9",'
+ ' "usmDESPrivProtocol": "privkey9"'
+ ' },'
+ ' {'
+ ' "user": "user10",'
+ ' "engineId": "8000000000000010",'
+ ' "usmHMACSHAAuthProtocol": "authkey10",'
+ ' "usm3DESEDEPrivProtocol": "privkey10"'
+ ' },'
+ ' {'
+ ' "user": "user11",'
+ ' "engineId": "8000000000000011",'
+ ' "usmHMACSHAAuthProtocol": "authkey11",'
+ ' "usmAesCfb128Protocol": "privkey11"'
+ ' },'
+ ' {'
+ ' "user": "user12",'
+ ' "engineId": "8000000000000012",'
+ ' "usmHMACSHAAuthProtocol": "authkey12",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey12"'
+ ' },'
+ ' {'
+ ' "user": "user13",'
+ ' "engineId": "8000000000000013",'
+ ' "usmHMACSHAAuthProtocol": "authkey13",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey13"'
+ ' },'
+ ' {'
+ ' "user": "user14",'
+ ' "engineId": "8000000000000014",'
+ ' "usmHMACSHAAuthProtocol": "authkey14",'
+ ' "usmAesCfb192Protocol": "privkey14"'
+ ' },'
+ ' {'
+ ' "user": "user15",'
+ ' "engineId": "8000000000000015",'
+ ' "usmHMACSHAAuthProtocol": "authkey15",'
+ ' "usmAesCfb256Protocol": "privkey15"'
+ ' },'
+ ' {'
+ ' "user": "user17",'
+ ' "engineId": "8000000000000017",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey17",'
+ ' "usmDESPrivProtocol": "privkey17"'
+ ' },'
+ ' {'
+ ' "user": "user18",'
+ ' "engineId": "8000000000000018",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey18",'
+ ' "usm3DESEDEPrivProtocol": "privkey18"'
+ ' },'
+ ' {'
+ ' "user": "user19",'
+ ' "engineId": "8000000000000019",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey19",'
+ ' "usmAesCfb128Protocol": "privkey19"'
+ ' },'
+ ' {'
+ ' "user": "user20",'
+ ' "engineId": "8000000000000020",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey20",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey20"'
+ ' },'
+ ' {'
+ ' "user": "user21",'
+ ' "engineId": "8000000000000021",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey21",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey21"'
+ ' },'
+ ' {'
+ ' "user": "user22",'
+ ' "engineId": "8000000000000022",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey22",'
+ ' "usmAesCfb192Protocol": "privkey22"'
+ ' },'
+ ' {'
+ ' "user": "user23",'
+ ' "engineId": "8000000000000023",'
+ ' "usmHMAC128SHA224AuthProtocol": "authkey23",'
+ ' "usmAesCfb256Protocol": "privkey23"'
+ ' },'
+ ' {'
+ ' "user": "user25",'
+ ' "engineId": "8000000000000025",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey25",'
+ ' "usmDESPrivProtocol": "privkey25"'
+ ' },'
+ ' {'
+ ' "user": "user26",'
+ ' "engineId": "8000000000000026",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey26",'
+ ' "usm3DESEDEPrivProtocol": "privkey26"'
+ ' },'
+ ' {'
+ ' "user": "user27",'
+ ' "engineId": "8000000000000027",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey27",'
+ ' "usmAesCfb128Protocol": "privkey27"'
+ ' },'
+ ' {'
+ ' "user": "user28",'
+ ' "engineId": "8000000000000028",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey28",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey28"'
+ ' },'
+ ' {'
+ ' "user": "user29",'
+ ' "engineId": "8000000000000029",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey29",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey29"'
+ ' },'
+ ' {'
+ ' "user": "user30",'
+ ' "engineId": "8000000000000030",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey30",'
+ ' "usmAesCfb192Protocol": "privkey30"'
+ ' },'
+ ' {'
+ ' "user": "user31",'
+ ' "engineId": "8000000000000031",'
+ ' "usmHMAC192SHA256AuthProtocol": "authkey31",'
+ ' "usmAesCfb256Protocol": "privkey31"'
+ ' },'
+ ' {'
+ ' "user": "user33",'
+ ' "engineId": "8000000000000033",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey33",'
+ ' "usmDESPrivProtocol": "privkey33"'
+ ' },'
+ ' {'
+ ' "user": "user34",'
+ ' "engineId": "8000000000000034",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey34",'
+ ' "usm3DESEDEPrivProtocol": "privkey34"'
+ ' },'
+ ' {'
+ ' "user": "user35",'
+ ' "engineId": "8000000000000035",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey35",'
+ ' "usmAesCfb128Protocol": "privkey35"'
+ ' },'
+ ' {'
+ ' "user": "user36",'
+ ' "engineId": "8000000000000036",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey36",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey36"'
+ ' },'
+ ' {'
+ ' "user": "user37",'
+ ' "engineId": "8000000000000037",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey37",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey37"'
+ ' },'
+ ' {'
+ ' "user": "user38",'
+ ' "engineId": "8000000000000038",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey38",'
+ ' "usmAesCfb192Protocol": "privkey38"'
+ ' },'
+ ' {'
+ ' "user": "user39",'
+ ' "engineId": "8000000000000039",'
+ ' "usmHMAC256SHA384AuthProtocol": "authkey39",'
+ ' "usmAesCfb256Protocol": "privkey39"'
+ ' },'
+ ' {'
+ ' "user": "user41",'
+ ' "engineId": "8000000000000041",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey41",'
+ ' "usmDESPrivProtocol": "privkey41"'
+ ' },'
+ ' {'
+ ' "user": "user42",'
+ ' "engineId": "8000000000000042",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey42",'
+ ' "usm3DESEDEPrivProtocol": "privkey42"'
+ ' },'
+ ' {'
+ ' "user": "user43",'
+ ' "engineId": "8000000000000043",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey43",'
+ ' "usmAesCfb128Protocol": "privkey43"'
+ ' },'
+ ' {'
+ ' "user": "user44",'
+ ' "engineId": "8000000000000044",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey44",'
+ ' "usmAesBlumenthalCfb192Protocol": "privkey44"'
+ ' },'
+ ' {'
+ ' "user": "user45",'
+ ' "engineId": "8000000000000045",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey45",'
+ ' "usmAesBlumenthalCfb256Protocol": "privkey45"'
+ ' },'
+ ' {'
+ ' "user": "user46",'
+ ' "engineId": "8000000000000046",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey46",'
+ ' "usmAesCfb192Protocol": "privkey46"'
+ ' },'
+ ' {'
+ ' "user": "user47",'
+ ' "engineId": "8000000000000047",'
+ ' "usmHMAC384SHA512AuthProtocol": "authkey47",'
+ ' "usmAesCfb256Protocol": "privkey47"'
+ ' },'
+ ' {'
+ ' "user": "user48",'
+ ' "engineId": "8000000000000048",'
+ ' "usmNoAuthProtocol": "authkey48",'
+ ' "usmNoPrivProtocol": "privkey48"'
+ ' },'
+ ' {'
+ ' "user": "user49",'
+ ' "engineId": "8000000000000049",'
+ ' "unknownAuthProtocol": "authkey49",'
+ ' "unknownProtocol": "privkey49"'
+ ' }'
+ ' ]'
+ ' }'
+ )
+ JSON_MISSING_USER = (
+ ' "snmpv3_config": {'
+ ' "usm_users": ['
+ ' {'
+ ' "baduser": "user50",'
+ ' "engineId": "8000000000000050",'
+ ' "unknownAuthProtocol": "authkey50",'
+ ' "unknownProtocol": "privkey50"'
+ ' }'
+ ' ]'
+ ' }'
+ )
+ JSON_MISSING_ENGINE = (
+ ' "snmpv3_config": {'
+ ' "usm_users": ['
+ ' {'
+ ' "user": "user51",'
+ ' "badengineId": "8000000000000051",'
+ ' "unknownAuthProtocol": "authkey51",'
+ ' "unknownProtocol": "privkey51"'
+ ' }'
+ ' ]'
+ ' }'
+ )
+ JSON_COMMA = ','
+ JSON_END = '}'
+
+ @classmethod
+ def setUpClass(cls):
+ tds.init()
+
+
def test_v3_config_present(self):
"""
Test that snmpv3 config is present
"""
- tds.c_config = json.loads(
- '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" }, { "user": "user4"} , { "engineId": "1"}, { "user": "user6", "engineId": "8000000000000006", "usmNoAuth": "authkey3", "usmAesCfb192": "privkey6" }, { "user": "user7", "engineId": "8000000000000007", "usmNoAuth": "", "usmNoPriv": "" }] } }'
+ pconfig = (
+ test_snmpv3_config.JSON_START +
+ test_snmpv3_config.JSON_COMMA +
+ test_snmpv3_config.JSON_USERS +
+ test_snmpv3_config.JSON_END
)
+ tds.c_config = json.loads(pconfig)
- # del os.environ['CBS_SIM_JSON']
- # result = trapd_get_cbs_config.get_cbs_config()
- # print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, False)
+ snmp_engine = engine.SnmpEngine()
+ rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
+ self.assertEqual(rsnmp_engine, snmp_engine)
- with pytest.raises(Exception) as pytest_wrapped_sys_exit:
- snmp_engine = engine.SnmpEngine()
- result = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
- # config, snmp_engine=trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, pytest_json_data_with_v3)
- assert pytest_wrapped_sys_exit.type == SystemExit
- # assert pytest_wrapped_sys_exit.value.code == 1
def test_v3_config_not_present(self):
"""
Test that app is ok if v3 config not present
"""
- tds.c_config = json.loads(
- '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } } }'
- )
+ pconfig = (
+ test_snmpv3_config.JSON_START +
+ test_snmpv3_config.JSON_END
+ )
+ tds.c_config = json.loads(pconfig)
- # del os.environ['CBS_SIM_JSON']
- # result = trapd_get_cbs_config.get_cbs_config()
- # print("result: %s" % result)
- # compare = str(result).startswith("{'snmptrap': ")
- # self.assertEqual(compare, False)
+ snmp_engine = engine.SnmpEngine()
+ rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
+ self.assertEqual(rsnmp_engine, snmp_engine)
+
+
+ @unittest.skip("need to understand what happens when a username is missing")
+ def test_v3_config_missing_user(self):
+ """
+ Test that app is ok if v3 config has a missing user name
+ """
+ pconfig = (
+ test_snmpv3_config.JSON_START +
+ test_snmpv3_config.JSON_COMMA +
+ test_snmpv3_config.JSON_MISSING_USER +
+ test_snmpv3_config.JSON_END
+ )
+ tds.c_config = json.loads(pconfig)
+
+ snmp_engine = engine.SnmpEngine()
+ rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
+ self.assertEqual(rsnmp_engine, snmp_engine)
+
+
+ def test_v3_config_missing_engine(self):
+ """
+ Test that app is ok if v3 config has a missing engine name
+ """
+ pconfig = (
+ test_snmpv3_config.JSON_START +
+ test_snmpv3_config.JSON_COMMA +
+ test_snmpv3_config.JSON_MISSING_ENGINE +
+ test_snmpv3_config.JSON_END
+ )
+ tds.c_config = json.loads(pconfig)
- with pytest.raises(Exception) as pytest_wrapped_sys_exit:
- snmp_engine = engine.SnmpEngine()
- result = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
- # config, snmp_engine=trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, pytest_json_data_with_v3)
- assert pytest_wrapped_sys_exit.type == SystemExit
- # assert pytest_wrapped_sys_exit.value.code == 1
+ snmp_engine = engine.SnmpEngine()
+ rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config)
+ self.assertEqual(rsnmp_engine, snmp_engine)
-if __name__ == "__main__":
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py
index 16b0a17..3041884 100644
--- a/tests/test_trapd_stormwatch.py
+++ b/tests/test_trapd_stormwatch.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,14 +14,15 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
import time
+from unittest.mock import patch
import trapd_stormwatch as sw
import trapd_stormwatch_settings as sws
import trapd_stats_settings as stats
+import trapd_settings as tds
class test_cleanup_and_exit(unittest.TestCase):
@@ -29,84 +30,182 @@ class test_cleanup_and_exit(unittest.TestCase):
Test for presense of required vars
"""
- def test_increment_existing_counter(self):
- """
- Test increment counter
- """
- sw.sw_init()
+ @classmethod
+ def setUp(cls):
+ tds.init()
stats.init()
+ sw.sw_init()
+ sws.init()
- oid = ".1.2.3.4.5.6"
- sws.sw_config_oid_dict[oid] = True
- sws.sw_config_low_water_in_interval_dict[oid] = 1
- sws.sw_config_high_water_in_interval_dict[oid] = 10
-
- try:
- sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6")
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
-
- # try again, but with stats.total_notifications removed
- delattr(stats, "total_notifications")
- try:
- sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6")
- result = True
- except:
- result = False
+ def test_sw_init(self):
+ """ test sw_init() """
+ sw.sw_init()
+ self.assertEqual(sws.sw_interval_in_seconds, 60)
+ sws.init()
+ self.assertEqual(sws.sw_config_category, {})
- self.assertEqual(result, True)
def test_sw_clear_dicts(self):
"""
Test sw_clear_dicts
"""
- sw.sw_init()
- # initialize attributes not handled by sw_init()
- sws.sw_storm_counter_dict = {}
- stats.agent_counter_dict = {}
- stats.oid_counter_dict = {}
- sws.sw_config_category = {}
- # provide a value that can tested for
- sws.sw_storm_counter_dict["abc"] = "def"
+ with patch.dict('trapd_stormwatch_settings.sw_storm_counter_dict'):
+ with patch.dict('trapd_stormwatch_settings.sw_config_category'):
+ with patch.dict('trapd_stats_settings.agent_counter_dict'):
+ with patch.dict('trapd_stats_settings.oid_counter_dict'):
+
+ # initialize attributes not handled by sw_init()
+ sws.sw_storm_counter_dict = {}
+ sws.sw_config_category = {}
+ stats.agent_counter_dict = {}
+ stats.oid_counter_dict = {}
+
+ # provide a value that can tested for
+ sws.sw_storm_counter_dict["abc"] = "def"
+ self.assertTrue("abc" in sws.sw_storm_counter_dict)
+
+ self.assertTrue(sw.sw_clear_dicts())
+ self.assertFalse("abc" in sws.sw_storm_counter_dict)
- sw.sw_clear_dicts()
- self.assertFalse("abc" in sws.sw_storm_counter_dict)
+ # now make sure we get an exception
+ sws.sw_config_category = 3
+ self.assertFalse(sw.sw_clear_dicts())
- # now make sure we get an exception
- sws.sw_config_category = 3
- self.assertFalse(sw.sw_clear_dicts())
- # clean up the attributes we added above
- delattr(sws, "sw_storm_counter_dict")
- delattr(stats, "agent_counter_dict")
- delattr(stats, "oid_counter_dict")
- delattr(sws, "sw_config_category")
+ def test_sw_load_trap_config(self):
+ """ Test sw_load_trap_config(_config) """
+ trap_dict_info = {
+ "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
+ "agent address": "1.2.3.4",
+ "agent name": "test-agent.nodomain.com",
+ "cambria.partition": "test-agent.nodomain.com",
+ "community": "",
+ "community len": 0,
+ "epoch_serno": 15222068260000,
+ "protocol version": "v2c",
+ "time received": 1522206826.2938566,
+ "trap category": "ONAP-COLLECTOR-SNMPTRAP",
+ "sysUptime": "218567736",
+ "notify OID": "1.3.6.1.4.1.9999.9.9.999",
+ "trap_config": { },
+ "notify OID len": 10,
+ }
+
+ # normal operation, and variations
+ ret1 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret1, 0)
+ self.assertIsInstance(ret1, int)
+
+ trap_dict_info["trap_config"]["notify_oids"] = [
+ { "oidx": "1.3.6.1.4.1.9999.9.9.888" }
+ ]
+ ret2 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret2, 0)
+ self.assertIsInstance(ret2, int)
+
+ trap_dict_info["trap_config"]["metric_log_notification_threshold_pct"] = 33
+ ret3 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret3, 0)
+ self.assertIsInstance(ret3, int)
+
+ trap_dict_info["trap_config"]["sw_interval_in_seconds"] = 50
+ ret4 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret4, 0)
+ self.assertIsInstance(ret4, int)
+
+ trap_dict_info["trap_config"]["notify_oids"] = [
+ {
+ "oid": "1.3.6.1.4.1.9999.9.9.888",
+ "sw_high_water_in_interval": 3,
+ "sw_low_water_in_interval": 2,
+ "category": "abc",
+ }
+ ]
+ ret5 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret5, 1)
+ self.assertIsInstance(ret5, int)
+
+ delattr(sws, 'sw_storm_active_dict')
+ ret6 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret6, 1)
+ self.assertIsInstance(ret6, int)
+
def test_sw_log_metrics(self):
"""
Test sw_clear_log_metrics
"""
- sw.sw_init()
-
stats.total_notifications = 3
stats.total_notifications = 50
sws.sw_interval_in_seconds = 30
stats.agent_counter_dict = {"a": 3, "b": 40}
stats.metric_log_notification_threshold_pct = 30
- sw.sw_log_metrics()
- # make sure we got this far
- assert True
+ with patch('trapd_stormwatch.ecomp_logger') as magic_ecomp_logger:
+ sw.sw_log_metrics()
+ self.assertEqual(magic_ecomp_logger.call_count, 3)
+
+
+ def test_increment_existing_counter(self):
+ """
+ Test increment counter. There should NOT be an exception.
+ """
+ oid = ".1.2.3.4.5.6"
+ sws.sw_config_oid_dict[oid] = True
+ sws.sw_config_low_water_in_interval_dict[oid] = 1
+ sws.sw_config_high_water_in_interval_dict[oid] = 10
+
+ loc_agent = "192.168.1.1"
+ stats.agent_counter_dict[loc_agent] = 2
+ stats.oid_counter_dict[oid] = 102
+
+ sv_total_notifications = stats.total_notifications
+ sv_agent_count_dict = stats.agent_counter_dict[loc_agent]
+ sv_oid_count_dict = stats.oid_counter_dict[oid]
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except Exception as e:
+ result = False
+ self.assertEqual(result, True, "test_increment_existing_counter")
+ self.assertEqual(stats.total_notifications, sv_total_notifications+1)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], sv_agent_count_dict+1)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+1)
+
+ # try again, without agent_counter_dict[loc_agent]
+ del stats.agent_counter_dict[loc_agent]
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except Exception as e:
+ result = False
+ self.assertEqual(result, True, "test_increment_existing_counter")
+ self.assertEqual(stats.total_notifications, sv_total_notifications+2)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], 1)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+2)
+
+ # try again, but with stats.total_notifications removed
+ delattr(stats, "total_notifications")
+
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except:
+ result = False
+
+ self.assertEqual(result, True, "stats.total_notifications removed")
+ self.assertEqual(stats.total_notifications, 1)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], 2)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+3)
+
def test_sw_storm_active(self):
"""
Test sw_storm_active()
"""
- sw.sw_init()
+ print(f"sw_last_stormwatch_dict_analysis={sws.sw_last_stormwatch_dict_analysis}")
+
# initialize attributes not handled by sw_init()
stats.oid_counter_dict = {}
@@ -154,7 +253,9 @@ class test_cleanup_and_exit(unittest.TestCase):
self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid))
# now force sws.sw_last_stormwatch_dict_analysis to an old value
- sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20
+ sws.sw_last_stormwatch_dict_analysis = (
+ int(time.time()) - sws.sw_interval_in_seconds - 20
+ )
# and make certain that stats.oid_counter_dict got cleared.
if not hasattr(stats, "oid_counter_dict"):
stats.oid_counter_dict = {}
@@ -164,6 +265,47 @@ class test_cleanup_and_exit(unittest.TestCase):
# .get("abc") != None)
-if __name__ == "__main__":
+ @patch('trapd_stormwatch.ecomp_logger')
+ def test_sw_reset_counter_dict(self, magic_ecomp_logger):
+ """ test sw_reset_counter_dict() """
+
+ loc_agent = "192.168.1.1"
+ loc_oid = ".1.2.3.4.5.6"
+ loc_agent_oid = loc_agent + " " + loc_oid
+
+ self.assertTrue(sw.sw_reset_counter_dict())
+ self.assertEqual(magic_ecomp_logger.call_count, 2)
+
+ # cases:
+ # for lao in storm_active_dict:
+ # storm_counter_dict[lao] >= high_water_val[lo]
+ # storm_counter_dict[lao] < high_water_val[lo]
+ # storm_counter_dict[lao] < low_water_val[lo]
+ # storm_counter_dict[lao] >= low_water_val[lo]
+
+ with patch.dict(sws.sw_storm_counter_dict, {
+ loc_agent_oid: 20
+ }):
+ # values around the 20 above
+ for high_water_mark in [2, 60]:
+ # values around the 20 above
+ for low_water_mark in [0, 30]:
+ with patch.dict(sws.sw_config_high_water_in_interval_dict, {
+ loc_oid: high_water_mark
+ }):
+ with patch.dict(sws.sw_config_low_water_in_interval_dict, {
+ loc_oid: low_water_mark
+ }):
+ with patch.dict(sws.sw_storm_active_dict, {
+ loc_agent_oid: "anything"
+ }):
+ sv_storm_counter = sws.sw_storm_counter_dict[loc_agent_oid]
+ magic_ecomp_logger.call_count = 0
+ self.assertTrue(sw.sw_reset_counter_dict())
+ self.assertEqual(magic_ecomp_logger.call_count, 3)
+ self.assertEqual(sws.sw_storm_counter_dict[loc_agent_oid], 0)
+
+
+if __name__ == "__main__": # pragma: no cover
# sws.init()
unittest.main()
diff --git a/tests/test_trapd_stormwatch_settings.py b/tests/test_trapd_stormwatch_settings.py
index bcb04a7..f5ad9a7 100644
--- a/tests/test_trapd_stormwatch_settings.py
+++ b/tests/test_trapd_stormwatch_settings.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
@@ -34,26 +33,16 @@ class test_cleanup_and_exit(unittest.TestCase):
Test nosuch var
"""
sws.init()
- try:
- sws.no_such_var
- result = True
- except:
- result = False
+ self.assertFalse(hasattr(sws, 'no_such_var'))
- self.assertEqual(result, False)
def test_storm_counter_dict(self):
"""
Test storm_counter_dict
"""
sws.init()
- try:
- sws.sw_storm_counter_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_storm_counter_dict'))
- self.assertEqual(result, True)
def test_storm_active_dict(self):
"""
@@ -61,13 +50,8 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_storm_active_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_storm_active_dict'))
- self.assertEqual(result, True)
def test_sw_config_oid_dict(self):
"""
@@ -75,13 +59,8 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_config_oid_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_config_oid_dict'))
- self.assertEqual(result, True)
def test_sw_config_low_water_in_interval_dict(self):
"""
@@ -89,13 +68,8 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_config_low_water_in_interval_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_config_low_water_in_interval_dict'))
- self.assertEqual(result, True)
def test_sw_config_high_water_in_interval_dict(self):
"""
@@ -103,13 +77,8 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_config_high_water_in_interval_dict
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_config_high_water_in_interval_dict'))
- self.assertEqual(result, True)
def test_sw_config_category(self):
"""
@@ -117,43 +86,26 @@ class test_cleanup_and_exit(unittest.TestCase):
"""
sws.init()
- try:
- sws.sw_config_category
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_config_category'))
- self.assertEqual(result, True)
def test_sw_interval_in_seconds(self):
"""
Test sw_interval
"""
-
sws.init()
- try:
- str(sws.sw_interval_in_seconds).isnumeric()
- result = True
- except:
- result = False
+ self.assertTrue(hasattr(sws, 'sw_interval_in_seconds'))
+ self.assertTrue(str(sws.sw_interval_in_seconds).isnumeric())
- self.assertEqual(result, True)
def test_sw_last_stormwatch_dict_analysis(self):
"""
Test last_stormwatch_dict_analysis
"""
-
sws.init()
- try:
- str(sws.sw_last_stormwatch_dict_analysis).isnumeric()
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
+ self.assertTrue(hasattr(sws, 'sw_last_stormwatch_dict_analysis'))
+ self.assertTrue(str(sws.sw_last_stormwatch_dict_analysis).isnumeric())
-if __name__ == "__main__":
- # sws.init()
- unittest.main()
+if __name__ == "__main__": # pragma: no cover
+ unittest.main(verbosity=2)
diff --git a/tests/test_trapd_vb_types.py b/tests/test_trapd_vb_types.py
index 5792b8c..28150c9 100644
--- a/tests/test_trapd_vb_types.py
+++ b/tests/test_trapd_vb_types.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,18 +14,10 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
-import json
import unittest
-import os
-from onap_dcae_cbs_docker_client.client import get_config
-from trapd_exit import cleanup_and_exit
-from trapd_io import stdout_logger, ecomp_logger
-import trapd_settings as tds
import trapd_vb_types
-
-from pysnmp.entity import engine, config
+import trapd_settings as tds
class test_trapd_vb_types(unittest.TestCase):
@@ -33,121 +25,102 @@ class test_trapd_vb_types(unittest.TestCase):
Test snmpv3 module
"""
- good_varbind_types = [
- "Integer",
- "Unsigned32",
- "Counter32",
- "OctetString",
- "ObjectIdentifier",
- "TimeTicks",
- "IpAddress",
- ]
+ @classmethod
+ def setUpClass(cls):
+ tds.init()
- def trapd_vb_type_conversion_integer(self):
+
+ def test_trapd_vb_type_conversion_integer32(self):
"""
Test that pysnmp varbind types Integer converts
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer32"), "integer")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer32")
- self.assertEqual(result, "integer")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_integer(self):
"""
Test that pysnmp varbind types Integer converts
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer"), "integer")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer")
- self.assertEqual(result, "integer")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_gauge32(self):
"""
Test that pysnmp varbind types Integer converts
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Gauge32"), "unsigned")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Gauge32")
- self.assertEqual(result, "unsigned")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_counter32(self):
"""
Test that pysnmp varbind types Integer converts
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Counter32"), "counter32")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Counter32")
- self.assertEqual(result, "counter32")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_octetstring(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("OctetString"), "octet")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("OctetString")
- self.assertEqual(result, "octet")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_py_type_5(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_5"), "hex")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_5")
- self.assertEqual(result, "hex")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_py_type_6(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_6"), "decimal")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_6")
- self.assertEqual(result, "decimal")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_null(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Null"), "null")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Null")
- self.assertEqual(result, "null")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_objectidentifier(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("ObjectIdentifier"), "oid")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("ObjectIdentifier")
- self.assertEqual(result, "oid")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_timeticks(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("TimeTicks"), "timeticks")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("TimeTicks")
- self.assertEqual(result, "timeticks")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_ipaddress(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("IpAddress"), "ipaddress")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("IpAddress")
- self.assertEqual(result, "ipaddress")
- def trapd_vb_type_conversion_integer(self):
+ def test_trapd_vb_type_conversion_bits(self):
"""
Test that pysnmp varbind types convert accurately
"""
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Bits"), "bits")
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Bits")
- self.assertEqual(result, "bits")
- def trapd_vb_type_conversion_invalid(self):
+ def test_trapd_vb_type_conversion_invalid(self):
"""
Test that pysnmp varbind types convert accurately
"""
-
- result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("noSuchVarbindType")
# should return default of octet if not defined
- self.assertEqual(result, "octet")
+ self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("noSuchVarbindType"), "octet")
-if __name__ == "__main__":
- unittest.main()
+if __name__ == "__main__": # pragma: no cover
+ unittest.main(verbosity=2)