diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_snmptrapd.py | 610 | ||||
-rw-r--r-- | tests/test_trapd_exit.py | 26 | ||||
-rw-r--r-- | tests/test_trapd_get_cbs_config.py | 221 | ||||
-rw-r--r-- | tests/test_trapd_http_session.py | 63 | ||||
-rw-r--r-- | tests/test_trapd_io.py | 521 | ||||
-rw-r--r-- | tests/test_trapd_runtime_pid.py | 39 | ||||
-rw-r--r-- | tests/test_trapd_settings.py | 48 | ||||
-rw-r--r-- | tests/test_trapd_snmpv3.py | 471 | ||||
-rw-r--r-- | tests/test_trapd_stormwatch.py | 250 | ||||
-rw-r--r-- | tests/test_trapd_stormwatch_settings.py | 76 | ||||
-rw-r--r-- | tests/test_trapd_vb_types.py | 95 |
11 files changed, 1889 insertions, 531 deletions
diff --git a/tests/test_snmptrapd.py b/tests/test_snmptrapd.py index dee2aa0..736e114 100644 --- a/tests/test_snmptrapd.py +++ b/tests/test_snmptrapd.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,17 @@ # limitations under the License. # ============LICENSE_END========================================================= +import copy +import datetime import os -import pytest import unittest +from pathlib import Path +import time +from unittest.mock import patch, Mock + +import requests + import snmptrapd -import datetime import trapd_settings as tds import trapd_stormwatch_settings as sws @@ -37,40 +43,207 @@ class test_snmptrapd(unittest.TestCase): Test the save_pid mod """ - pytest_json_data = '{ "snmptrapd": { "version": "2.0.3", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' + class WriteThrows(): + def write(): + raise RuntimeError("write() throws") + + + @classmethod + def setUpClass(cls): + + # init vars + tds.init() + sw.sw_init() + + # fmt: off + test_snmptrapd.pytest_empty_data = "{}" + test_snmptrapd.pytest_json_data = ( + '{ "snmptrapd": { ' + ' "version": "2.0.3", ' + ' "title": "ONAP SNMP Trap Receiver" }, ' + ' "protocols": { ' + ' "transport": "udp", ' + ' "ipv4_interface": "0.0.0.0", ' + ' "ipv4_port": 6162, ' + ' "ipv6_interface": "::1", ' + ' "ipv6_port": 6162 }, ' + ' "cache": { ' + ' "dns_cache_ttl_seconds": 60 }, ' + ' "publisher": { ' + ' "http_timeout_milliseconds": 1500, ' + ' "http_retries": 3, ' + ' "http_milliseconds_between_retries": 750, ' + ' "http_primary_publisher": "true", ' + ' "http_peer_publisher": "unavailable", ' + ' "max_traps_between_publishes": 10, ' + ' "max_milliseconds_between_publishes": 10000 }, ' + ' "streams_publishes": { ' + ' "sec_fault_unsecure": { ' + ' "type": "message_router", ' + ' "aaf_password": null, ' + ' "dmaap_info": { ' + ' "location": "mtl5", ' + ' "client_id": null, ' + ' "client_role": null, ' + ' "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' + ' "aaf_username": null } }, ' + ' "files": { ' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' + ' "log_dir": "logs", ' + ' "data_dir": "data", ' + ' "pid_dir": "tmp", ' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' + ' "traps_stats_log": "snmptrapd_stats.csv", ' + ' "perm_status_file": "snmptrapd_status.log", ' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' + ' "eelf_error": "error.log", ' + ' "eelf_debug": "debug.log", ' + ' "eelf_audit": "audit.log", ' + ' "eelf_metrics": "metrics.log", ' + ' "roll_frequency": "day", ' + ' "minimum_severity_to_log": 2 }, ' + ' "trap_config": { ' + ' "sw_interval_in_seconds": 60, ' + ' "notify_oids": { ' + ' ".1.3.6.1.4.1.9.0.1": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.2": { ' + ' "sw_high_water_in_interval": 101, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.3": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.4": { ' + ' "sw_high_water_in_interval": 10, ' + ' "sw_low_water_in_interval": 3, ' + ' "category": "logonly" } } }, ' + ' "snmpv3_config": { ' + ' "usm_users": [ { ' + ' "user": "usr-sha-aes256", ' + ' "engineId": "8000000001020304", ' + ' "usmHMACSHAAuth": "authkey1", ' + ' "usmAesCfb256": "privkey1" }, ' + ' { "user": "user1", ' + ' "engineId": "8000000000000001", ' + ' "usmHMACMD5Auth": "authkey1", ' + ' "usmDESPriv": "privkey1" }, ' + ' { "user": "user2", ' + ' "engineId": "8000000000000002", ' + ' "usmHMACSHAAuth": "authkey2", ' + ' "usmAesCfb128": "privkey2" }, ' + ' { "user": "user3", ' + ' "engineId": "8000000000000003", ' + ' "usmHMACSHAAuth": "authkey3", ' + ' "usmAesCfb256": "privkey3" } ' + ' ] } }' + ) + # fmt: off + + test_snmptrapd.trap_dict_info = { + "uuid": "06f6e91c-3236-11e8-9953-005056865aac", + "agent address": "1.2.3.4", + "agent name": "test-agent.nodomain.com", + "cambria.partition": "test-agent.nodomain.com", + "community": "", + "community len": 0, + "epoch_serno": 15222068260000, + "protocol version": "v2c", + "time received": 1522206826.2938566, + "trap category": "ONAP-COLLECTOR-SNMPTRAP", + "sysUptime": "218567736", + "notify OID": "1.3.6.1.4.1.9999.9.9.999", + "notify OID len": 10, + } + + snmptrap_dir = "/tmp/opt/app/snmptrap" + try: + Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) + except Exception as e: + print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + sys.exit(1) + + # create copy of snmptrapd.json for pytest + test_snmptrapd.pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" + with open(test_snmptrapd.pytest_json_config, "w") as outfile: + outfile.write(test_snmptrapd.pytest_json_data) + + test_snmptrapd.pytest_empty_config = "/tmp/opt/app/snmptrap/etc/empty.json" + with open(test_snmptrapd.pytest_empty_config, "w") as outfile: + outfile.write(test_snmptrapd.pytest_empty_data) - # create copy of snmptrapd.json for pytest - pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" - with open(pytest_json_config, "w") as outfile: - outfile.write(pytest_json_data) def test_usage_err(self): """ Test usage error """ - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - result = snmptrapd.usage_err() - assert pytest_wrapped_sys_exit.type == SystemExit - assert pytest_wrapped_sys_exit.value.code == 1 + with self.assertRaises(SystemExit) as exc: + snmptrapd.usage_err() + self.assertEqual(str(exc.exception), "1") + def test_load_all_configs(self): """ Test load of all configs """ + # request load of CBS data + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) - # init vars - tds.init() - sw.sw_init() + result = trapd_get_cbs_config.get_cbs_config() + self.assertEqual(result, True) - # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - self.assertEqual(result, True) + # request load of CBS data + self.assertEqual(snmptrapd.load_all_configs(0, 1), True) + + + def test_resolve_ip(self): + """ Test resolve_ip """ + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + time_base = 1000000 + time_offset = 10000 + with patch('time.time', return_value=time_base): + self.assertEqual(time.time(), time_base) + + fqdn = "foo.example" + ip = "1.2.3.4" + with patch.dict(tds.dns_cache_ip_to_name): + tds.dns_cache_ip_to_name = { } + # DOUBLE EXCEPTION - nothing in tds.dns_cache_ip_expires + # and gethostbyaddr() fails + with patch('socket.gethostbyaddr', side_effect=RuntimeError("gethostbyaddr raises")): + self.assertEqual(snmptrapd.resolve_ip(ip), ip) + self.assertEqual(tds.dns_cache_ip_to_name[ip], ip) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + tds.dns_cache_ip_to_name = { ip: fqdn } + with patch('socket.gethostbyaddr', return_value=(fqdn,fqdn,[ip])): + # EXCEPTION - nothing in tds.dns_cache_ip_expires + del tds.dns_cache_ip_expires[ip] + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() - 10000}): + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + + with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() + 10000}): + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + time_offset) - # request load of CBS data - result = snmptrapd.load_all_configs(0, 1) - self.assertEqual(result, True) def test_load_all_configs_signal(self): """ @@ -81,78 +254,101 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - self.assertEqual(result, True) + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + self.assertTrue(trapd_get_cbs_config.get_cbs_config()) + + # request load of CBS data + self.assertTrue(snmptrapd.load_all_configs(1, 1)) + + with patch('snmptrapd.get_cbs_config', return_value=False): + with self.assertRaises(SystemExit): + snmptrapd.load_all_configs(1, 1) - # request load of CBS data - result = snmptrapd.load_all_configs(1, 1) - self.assertEqual(result, True) def test_log_all_arriving_traps(self): """ Test logging of traps """ - # init vars tds.init() - # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - - # set last day to current - tds.last_day = datetime.datetime.now().day - - # trap dict for logging - tds.trap_dict = { - "uuid": "06f6e91c-3236-11e8-9953-005056865aac", - "agent address": "1.2.3.4", - "agent name": "test-agent.nodomain.com", - "cambria.partition": "test-agent.nodomain.com", - "community": "", - "community len": 0, - "epoch_serno": 15222068260000, - "protocol version": "v2c", - "time received": 1522206826.2938566, - "trap category": "ONAP-COLLECTOR-SNMPTRAP", - "sysUptime": "218567736", - "notify OID": "1.3.6.1.4.1.9999.9.9.999", - "notify OID len": 10, - } + # don't open files, but try to log - should raise exception + with self.assertRaises(Exception) as exc: + snmptrapd.log_all_arriving_traps() + self.assertIsInstance(exc.exception, TypeError) - # open eelf logs - trapd_io.open_eelf_logs() - - # open trap logs - tds.arriving_traps_filename = ( - tds.c_config["files"]["runtime_base_dir"] - + "/" - + tds.c_config["files"]["log_dir"] - + "/" - + (tds.c_config["files"]["arriving_traps_log"]) - ) - tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename) - - # name and open json trap log - tds.json_traps_filename = ( - tds.c_config["files"]["runtime_base_dir"] - + "/" - + tds.c_config["files"]["log_dir"] - + "/" - + "DMAAP_" - + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) - + ".json" - ) - tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename) - msg = "published traps logged to: %s" % tds.json_traps_filename - trapd_io.stdout_logger(msg) - trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + # request load of CBS data + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + # trap dict for logging + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + result = trapd_get_cbs_config.get_cbs_config() + + # set last day to current + tds.last_day = datetime.datetime.now().day + + + # open eelf logs + trapd_io.open_eelf_logs() + + # open trap logs + tds.arriving_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + (tds.c_config["files"]["arriving_traps_log"]) + ) + tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename) + + # name and open json trap log + tds.json_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + "DMAAP_" + + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) + + ".json" + ) + tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename) + msg = "published traps logged to: %s" % tds.json_traps_filename + trapd_io.stdout_logger(msg) + trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + # also force it to daily roll + snmptrapd.log_all_arriving_traps() + + # try again, but with day rolling + tds.last_day = datetime.datetime.now().day - 1 + snmptrapd.log_all_arriving_traps() + + # try again, with roll_frequency set to minute + tds.last_minute = datetime.datetime.now().minute - 1 + tds.c_config["files"]["roll_frequency"] = "minute" + snmptrapd.log_all_arriving_traps() + + # try again, with roll_frequency set to hour + tds.last_hour = datetime.datetime.now().hour - 1 + tds.c_config["files"]["roll_frequency"] = "hour" + snmptrapd.log_all_arriving_traps() + + # try again, with a bad trap_dict[time_received] + tds.trap_dict["time received"] = "bad_value" + snmptrapd.log_all_arriving_traps() + + # also test log_published_messages() + snmptrapd.log_published_messages("data #1") + + # work even if there is an exception + # this SHOULD be done with a context + sv_json_traps_fd = tds.json_traps_fd + tds.json_traps_fd = test_snmptrapd.WriteThrows() + snmptrapd.log_published_messages("data #2") + tds.json_traps_fd = sv_json_traps_fd - # don't open files, but try to log - should raise exception - with pytest.raises(Exception) as pytest_wrapped_exception: - result = snmptrapd.log_all_arriving_traps() - assert pytest_wrapped_exception.type == AttributeError def test_log_all_incorrect_log_type(self): """ @@ -163,11 +359,14 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - trapd_get_cbs_config.get_cbs_config() + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + trapd_get_cbs_config.get_cbs_config() + + # open eelf logs + trapd_io.open_eelf_logs() - # open eelf logs - trapd_io.open_eelf_logs() def test_v1_trap_receipt(self): """ @@ -178,26 +377,227 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - trapd_get_cbs_config.get_cbs_config() - - errorIndication, errorStatus, errorIndex, varbinds = next( - sendNotification( - SnmpEngine(), - CommunityData("not_public"), - UdpTransportTarget(("localhost", 6162)), - ContextData(), - "trap", - [ - ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), - ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), - ], + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + trapd_get_cbs_config.get_cbs_config() + + errorIndication, errorStatus, errorIndex, varbinds = next( + sendNotification( + SnmpEngine(), + CommunityData("not_public"), + UdpTransportTarget(("localhost", 6162)), + ContextData(), + "trap", + [ + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), + ], + ) ) - ) - result = errorIndication - self.assertEqual(result, None) + result = errorIndication + self.assertEqual(result, None) + + + def test_post_dmaap(self): + """ + Test post_dmaap() + """ + + # trap dict for logging + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + with patch('snmptrapd.ecomp_logger') as magic_ecomp_logger: + with patch('requests.Session.post') as magic_session_post: + fake_post_resp = Mock() + magic_session_post.return_value = fake_post_resp + + fake_post_resp.status_code = requests.codes.moved_permanently + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 11) + + fake_post_resp.status_code = requests.codes.ok + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 16) + + magic_ecomp_logger.call_count = 0 + tds.traps_since_last_publish = 1 + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 5) + + magic_ecomp_logger.call_count = 0 + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] = "aaf_username" + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_password"] = "aaf_password" + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 5) + + # for some reason this exception is being seen as an OSError ???? + magic_ecomp_logger.call_count = 0 + magic_session_post.side_effect = requests.exceptions.RequestException("test throw") + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 10) + + magic_ecomp_logger.call_count = 0 + magic_session_post.side_effect = OSError() + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 10) + + + @unittest.skip("do not know what to pass in for vars. Need an object with a clone() method") + def test_comm_string_rewrite_observer(self): + """ + test comm_string_rewrite_observer() + """ + vars = { "communityName": ["name"] } + snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx") + assertEqual(vars["communitName"], "public") + + vars = { "communityName": [] } + snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx") + assertEqual(vars["communitName"], "") + + + def test_snmp_engine_observer_cb(self): + """ + test snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): + """ + + snmp_engine = "snmp_engine" + execpoint = "execpoint" + cbCtx = "cbCtx" + variables = { + 'transportDomain': [ 1, 2, 3 ], + 'transportAddress': [ "a", "b", "c" ] + } + for secmodel,ret in [ (1, "v1"), (2, "v2c"), (3, "v3"), (4, "unknown") ]: + variables["securityModel"] = secmodel + snmptrapd.snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx) + self.assertEqual(tds.trap_dict["protocol version"], ret) -if __name__ == "__main__": + def test_add_varbind_to_log_string(self): + """ + test add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val) + """ + vb_oid = "vb_oid" + vb_type = "vb_type" + + class TempPP(): + def prettyPrint(self): + return "pp ret" + + vb_val = TempPP() + + self.assertEqual(tds.all_vb_str, "") + + snmptrapd.add_varbind_to_log_string(0, vb_oid, vb_type, vb_val) + self.assertEqual(tds.all_vb_str, + 'varbinds: [0] vb_oid {vb_type} pp ret') + + snmptrapd.add_varbind_to_log_string(1, vb_oid, vb_type, vb_val) + self.assertEqual(tds.all_vb_str, + 'varbinds: [0] vb_oid {vb_type} pp ret [1] vb_oid {vb_type} pp ret') + + + def test_add_varbind_to_json(self): + """ + test add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val) + """ + + class TempPP(): + def __init__(self, ret): + self.ret = ret + def prettyPrint(self): + return self.ret + + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + vb_oid = TempPP("1.2.3") + override_vb_oid = TempPP("1.3.6.1.6.3.18.1.3.0") + + vb_type = "vb_type" + + vb_val = TempPP("foo.example") + + self.assertEqual(snmptrapd.add_varbind_to_json(0, vb_oid, vb_type, vb_val), 0) + self.assertEqual(snmptrapd.add_varbind_to_json(1, vb_oid, vb_type, vb_val), 0) + self.assertEqual(tds.trap_dict["notify OID"], ".foo.example") + self.assertEqual(tds.trap_dict["notify OID len"], 2) + + with patch('snmptrapd.resolve_ip') as magic_resolve_ip: + magic_resolve_ip.return_value = 'foo.example' + self.assertEqual(snmptrapd.add_varbind_to_json(2, override_vb_oid, vb_type, vb_val), 0) + self.assertEqual(tds.trap_dict["agent address"], "foo.example") + self.assertEqual(tds.trap_dict["agent name"], "foo.example") + + sv_protocol_version = tds.trap_dict["protocol version"] + tds.trap_dict["protocol version"] = "v1" + self.assertEqual(snmptrapd.add_varbind_to_json(4, vb_oid, vb_type, vb_val), 0) + self.assertEqual(snmptrapd.add_varbind_to_json(5, vb_oid, vb_type, vb_val), 1) + + tds.trap_dict["protocol version"] = sv_protocol_version + self.assertEqual(snmptrapd.add_varbind_to_json(6, vb_oid, vb_type, vb_val), 1) + self.assertEqual(tds.all_vb_json_str, + ', "varbinds": [{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"}') + + self.assertEqual(snmptrapd.add_varbind_to_json(7, vb_oid, vb_type, vb_val), 1) + self.assertEqual(tds.all_vb_json_str, + ', "varbinds": [{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"}') + + + @patch('snmptrapd.log_all_arriving_traps') + @patch('snmptrapd.post_dmaap', return_value = 0) + @patch('snmptrapd.ecomp_logger', return_value = 0) + @patch('snmptrapd.add_varbind_to_json', return_value = 1) + @patch('snmptrapd.add_varbind_to_log_string', return_value = 0) + def test_notif_receiver_cb(self, magic_add_varbind_to_log_string, magic_add_varbind_to_json, + magic_ecomp_logger, magic_port_dmaap, magic_lost_all_arriving_traps): + """ notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, varBinds, cbCtx) """ + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + with patch('trapd_stormwatch.sw_storm_active', return_value=True): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertFalse(tds.first_trap) + self.assertEqual(magic_ecomp_logger.call_count, 8) + + magic_ecomp_logger.call_count = 0 + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.c_config["publisher"]["max_traps_between_publishes"] = 1 + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.c_config["publisher"]["max_traps_between_publishes"] = 100 + tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 0 + tds.last_pub_time = 0 + with patch('time.time', return_value=0): + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.last_pub_time = 100000 + tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 1 + with patch('time.time', return_value=10): + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py index 0e38461..371b308 100644 --- a/tests/test_trapd_exit.py +++ b/tests/test_trapd_exit.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit @@ -29,27 +28,26 @@ class test_cleanup_and_exit(unittest.TestCase): def test_normal_exit(self): """ - Test normal exit works as expected + Test normal exit works as expected, and exits with the 1st arg """ - open(pid_file, "w") + # create an empty pid file + with open(pid_file, "w"): + pass - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: + with self.assertRaises(SystemExit) as exc: result = trapd_exit.cleanup_and_exit(0, pid_file) - assert pytest_wrapped_sys_exit.type == SystemExit - assert pytest_wrapped_sys_exit.value.code == 0 + self.assertEqual(str(exc.exception), "0") - # compare = str(result).startswith("SystemExit: 0") - # self.assertEqual(compare, True) def test_abnormal_exit(self): """ - Test exit with missing PID file exits non-zero + Test exit with missing PID file. Still exits with the 1st arg. """ - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: + + with self.assertRaises(SystemExit) as exc: result = trapd_exit.cleanup_and_exit(0, pid_file_dne) - assert pytest_wrapped_sys_exit.type == SystemExit - assert pytest_wrapped_sys_exit.value.code == 1 + self.assertEqual(str(exc.exception), "0") -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py index b7c5d7b..4064b78 100644 --- a/tests/test_trapd_get_cbs_config.py +++ b/tests/test_trapd_get_cbs_config.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,101 +14,198 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest -import unittest +import json import os import sys +import unittest +from unittest.mock import patch +from pathlib import Path -from onap_dcae_cbs_docker_client.client import get_config -from trapd_exit import cleanup_and_exit -from trapd_io import stdout_logger, ecomp_logger -import trapd_settings as tds import trapd_get_cbs_config -from pathlib import Path -# # # # # # # # -# ENV setup -# # # # # # # # - -# required directory tree -try: - Path("/tmp/opt/app/snmptrap/logs").mkdir(parents=True, exist_ok=True) - Path("/tmp/opt/app/snmptrap/tmp").mkdir(parents=True, exist_ok=True) -except Exception as e: - print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) - sys.exit(1) - -# env var for CBS_SIM_JSON -try: - os.environ["CBS_SIM_JSON"] = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" -except Exception as e: - print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) - sys.exit(1) - -pytest_json_data = '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' - -# create snmptrapd.json for pytest -pytest_json_config = os.getenv("CBS_SIM_JSON") -with open(pytest_json_config, "w") as outfile: - outfile.write(pytest_json_data) -outfile.close() - -# test class/methods -class test_get_cbs_config(unittest.TestCase): +class test_trapd_get_cbs_config(unittest.TestCase): """ Test the trapd_get_cbs_config mod """ + snmptrap_dir = "/tmp/opt/app/snmptrap" + json_dir = snmptrap_dir + "/etc" + + # fmt: off + pytest_json_data = json.loads( + '{' + '"snmptrapd": { ' + ' "version": "1.4.0", ' + ' "title": "ONAP SNMP Trap Receiver" }, ' + '"protocols": { ' + ' "transport": "udp", ' + ' "ipv4_interface": "0.0.0.0", ' + ' "ipv4_port": 6162, ' + ' "ipv6_interface": "::1", ' + ' "ipv6_port": 6162 }, ' + '"cache": { ' + ' "dns_cache_ttl_seconds": 60 }, ' + '"publisher": { ' + ' "http_timeout_milliseconds": 1500, ' + ' "http_retries": 3, ' + ' "http_milliseconds_between_retries": 750, ' + ' "http_primary_publisher": "true", ' + ' "http_peer_publisher": "unavailable", ' + ' "max_traps_between_publishes": 10, ' + ' "max_milliseconds_between_publishes": 10000 }, ' + '"streams_publishes": { ' + ' "sec_fault_unsecure": { ' + ' "type": "message_router", ' + ' "aaf_password": null, ' + ' "dmaap_info": { ' + ' "location": "mtl5", ' + ' "client_id": null, ' + ' "client_role": null, ' + ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' + ' "aaf_username": null } }, ' + '"files": { ' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' + ' "log_dir": "logs", ' + ' "data_dir": "data", ' + ' "pid_dir": "tmp", ' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' + ' "traps_stats_log": "snmptrapd_stats.csv", ' + ' "perm_status_file": "snmptrapd_status.log", ' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' + ' "eelf_error": "error.log", ' + ' "eelf_debug": "debug.log", ' + ' "eelf_audit": "audit.log", ' + ' "eelf_metrics": "metrics.log", ' + ' "roll_frequency": "day", ' + ' "minimum_severity_to_log": 2 }, ' + '"trap_config": { ' + ' "sw_interval_in_seconds": 60, ' + ' "notify_oids": { ' + ' ".1.3.6.1.4.1.9.0.1": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.2": { ' + ' "sw_high_water_in_interval": 101, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.3": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.4": { ' + ' "sw_high_water_in_interval": 10, ' + ' "sw_low_water_in_interval": 3, ' + ' "category": "logonly" } } }, ' + '"snmpv3_config": { ' + ' "usm_users": [ { ' + ' "user": "usr-sha-aes256", ' + ' "engineId": "8000000001020304", ' + ' "usmHMACSHAAuth": "authkey1", ' + ' "usmAesCfb256": "privkey1" }, ' + ' { "user": "user1", ' + ' "engineId": "8000000000000001", ' + ' "usmHMACMD5Auth": "authkey1", ' + ' "usmDESPriv": "privkey1" }, ' + ' { "user": "user2", ' + ' "engineId": "8000000000000002", ' + ' "usmHMACSHAAuth": "authkey2", ' + ' "usmAesCfb128": "privkey2" }, ' + ' { "user": "user3", ' + ' "engineId": "8000000000000003", ' + ' "usmHMACSHAAuth": "authkey3", ' + ' "usmAesCfb256": "privkey3" } ' + '] } }' + ) + # fmt: on + + + @classmethod + def setUpClass(cls): + """ set up the required directory tree """ + try: + Path(test_trapd_get_cbs_config.snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) + Path(test_trapd_get_cbs_config.snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) + Path(test_trapd_get_cbs_config.snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) + except Exception as e: + print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + sys.exit(1) + + + def write_config(self, filename, config): + """ + write a config file + """ + # create snmptrapd.json for pytest + with open(filename, "w") as outfile: + json.dump(config, outfile) + + + @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/snmptrapd.json"}) def test_cbs_fallback_env_present(self): """ Test that CBS fallback env variable exists and we can get config from fallback env var """ - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, True) - self.assertEqual(result, True) + assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/snmptrapd.json" + self.write_config(test_trapd_get_cbs_config.json_dir + "/snmptrapd.json", test_trapd_get_cbs_config.pytest_json_data) + + self.assertTrue(trapd_get_cbs_config.get_cbs_config()) + + @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/snmptrapd.json"}) + def test_cbs_fallback_env_present_bad_numbers(self): + """ + Test as in test_cbs_fallback_env_present(), but with + various values reset to be non-numeric. + """ + assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/snmptrapd.json" + with patch.dict(test_trapd_get_cbs_config.pytest_json_data): + test_trapd_get_cbs_config.pytest_json_data["publisher"]["http_milliseconds_between_retries"] = "notanumber" + test_trapd_get_cbs_config.pytest_json_data["files"]["minimum_severity_to_log"] = "notanumber" + test_trapd_get_cbs_config.pytest_json_data["publisher"]["http_retries"] = "notanumber" + self.write_config(test_trapd_get_cbs_config.json_dir + "/snmptrapd.json", + test_trapd_get_cbs_config.pytest_json_data) + + self.assertTrue(trapd_get_cbs_config.get_cbs_config()) + + + @patch.dict(os.environ, {"CBS_SIM_JSON": json_dir + "/nosuchfile.json"}) def test_cbs_override_env_invalid(self): """ """ - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/nosuchfile.json") - # result = trapd_get_cbs_config.get_cbs_config() - # print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, False) + assert os.getenv("CBS_SIM_JSON") == test_trapd_get_cbs_config.json_dir + "/nosuchfile.json" - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: + with self.assertRaises(SystemExit) as exc: result = trapd_get_cbs_config.get_cbs_config() - assert pytest_wrapped_sys_exit.type == SystemExit - # assert pytest_wrapped_sys_exit.value.code == 1 + self.assertEqual(str(exc.exception), "1") + + @patch.dict(os.environ, {"CONSUL_HOST": "localhost"}) def test_cbs_env_present(self): """ Test that CONSUL_HOST env variable exists but fails to respond """ - os.environ.update(CONSUL_HOST="localhost") + self.assertEqual(os.getenv("CONSUL_HOST"), "localhost") + del os.environ["CBS_SIM_JSON"] - # result = trapd_get_cbs_config.get_cbs_config() - # print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, False) + self.assertNotIn("CBS_SIM_JSON", os.environ) - with pytest.raises(SystemExit) as sys_exit: + with self.assertRaises(SystemExit) as exc: trapd_get_cbs_config.get_cbs_config() - assert sys_exit.value.errno == errno.ECONNREFUSED + + @patch.dict(os.environ, {}) def test_cbs_override_env_undefined(self): """ """ - print("------>>> RUNNING test_no_cbs_override_env_var:") del os.environ["CBS_SIM_JSON"] + self.assertNotIn("CBS_SIM_JSON", os.environ) - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - assert trapd_get_cbs_config.get_cbs_config() == SystemExit + with self.assertRaises(SystemExit) as exc: + trapd_get_cbs_config.get_cbs_config() -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py index 7ec89a6..667a454 100644 --- a/tests/test_trapd_http_session.py +++ b/tests/test_trapd_http_session.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,9 +14,11 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_http_session +import requests +from unittest.mock import Mock, patch +import trapd_settings as tds class test_init_session_obj(unittest.TestCase): @@ -24,39 +26,62 @@ class test_init_session_obj(unittest.TestCase): Test the init_session_obj mod """ - def close_nonexisting_session(self): + @classmethod + def setUpClass(cls): + tds.init() + + def test_init_session_obj(self): """ - test close of existing http session + test creation of http session """ - sess = "no session" - result = trapd_http_session.close_session_obj(sess) - self.assertEqual(result, True) + self.assertIsInstance(trapd_http_session.init_session_obj(), requests.sessions.Session) + - def init_session(self): + def test_init_session_obj_raises(self): """ - test creation of http session + test close when the requests.Session() method throws an exception + """ + with patch('requests.Session') as magic_requests: + magic_requests.side_effect = RuntimeError("Session() throws via mock") + with self.assertRaises(SystemExit) as exc: + trapd_http_session.init_session_obj() + self.assertEqual(str(exc.exception), "1") + + + def test_close_nonexisting_session(self): """ - result = trapd_http_session.init_session_obj() - compare = str(result).startswith("<requests.sessions.Session object at") - self.assertEqual(compare, True) + test close of non-existing http session + """ + self.assertIsNone(trapd_http_session.close_session_obj(None)) + + + def test_close_nonexisting_close_raises(self): + """ + test close when the session.close() method throws + """ + class CloseThrows(): + def close(self): + raise RuntimeError("close() throws") + + with self.assertRaises(SystemExit): + trapd_http_session.close_session_obj(CloseThrows()) + def test_reset(self): """ test reset of existing http session """ sess = trapd_http_session.init_session_obj() - result = trapd_http_session.reset_session_obj(sess) - compare = str(result).startswith("<requests.sessions.Session object at") - self.assertEqual(compare, True) + self.assertIsInstance(trapd_http_session.reset_session_obj(sess), requests.sessions.Session) + - def close_existing_session(self): + def test_close_existing_session(self): """ test close of existing http session """ sess = trapd_http_session.init_session_obj() - result = trapd_http_session.close_session_obj(sess) - self.assertEqual(result, True) + self.assertTrue(trapd_http_session.close_session_obj(sess)) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_io.py b/tests/test_trapd_io.py index c1702aa..f3d1a4c 100644 --- a/tests/test_trapd_io.py +++ b/tests/test_trapd_io.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,17 +14,21 @@ # limitations under the License. # ============LICENSE_END========================================================= +import datetime +import glob +import io +import json import os -import pytest +from pathlib import Path +import sys +import tempfile import unittest +from unittest.mock import patch + import snmptrapd -import datetime -import json import trapd_settings as tds import trapd_runtime_pid import trapd_io -import sys -from pathlib import Path class test_trapd_io(unittest.TestCase): @@ -32,32 +36,141 @@ class test_trapd_io(unittest.TestCase): Test the save_pid mod """ - tds.c_config = json.loads( - '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' - ) + class PseudoFile(): + """ test file-like object that does nothing """ + def write(self): + pass + def close(self): + pass - def test_roll_all_files_notopen(self): - """ - Test rolling files that aren't open - """ + class WriteThrows(): + """ test file-like object that throws on a write """ + def write(self): + raise RuntimeError("close() throws") + + + @classmethod + def setUpClass(cls): + + tds.init() + + snmptrap_dir = "/tmp/opt/app/snmptrap" + try: + Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) + except Exception as e: + print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + sys.exit(1) + + # fmt: off + tds.c_config = json.loads( + '{ "snmptrapd": { ' + ' "version": "1.4.0", ' + ' "title": "ONAP SNMP Trap Receiver" }, ' + '"protocols": { ' + ' "transport": "udp", ' + ' "ipv4_interface": "0.0.0.0", ' + ' "ipv4_port": 6162, ' + ' "ipv6_interface": "::1", ' + ' "ipv6_port": 6162 }, ' + '"cache": { ' + ' "dns_cache_ttl_seconds": 60 }, ' + '"publisher": { ' + ' "http_timeout_milliseconds": 1500, ' + ' "http_retries": 3, ' + ' "http_milliseconds_between_retries": 750, ' + ' "http_primary_publisher": "true", ' + ' "http_peer_publisher": "unavailable", ' + ' "max_traps_between_publishes": 10, ' + ' "max_milliseconds_between_publishes": 10000 }, ' + '"streams_publishes": { ' + ' "sec_fault_unsecure": { ' + ' "type": "message_router", ' + ' "aaf_password": null, ' + ' "dmaap_info": { ' + ' "location": "mtl5", ' + ' "client_id": null, ' + ' "client_role": null, ' + ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' + ' "aaf_username": null } }, ' + '"files": { ' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' + ' "log_dir": "logs", ' + ' "data_dir": "data", ' + ' "pid_dir": "tmp", ' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' + ' "traps_stats_log": "snmptrapd_stats.csv", ' + ' "perm_status_file": "snmptrapd_status.log", ' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' + ' "eelf_error": "error.log", ' + ' "eelf_debug": "debug.log", ' + ' "eelf_audit": "audit.log", ' + ' "eelf_metrics": "metrics.log", ' + ' "roll_frequency": "day", ' + ' "minimum_severity_to_log": 2 }, ' + '"trap_config": { ' + ' "sw_interval_in_seconds": 60, ' + ' "notify_oids": { ' + ' ".1.3.6.1.4.1.9.0.1": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.2": { ' + ' "sw_high_water_in_interval": 101, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.3": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.4": { ' + ' "sw_high_water_in_interval": 10, ' + ' "sw_low_water_in_interval": 3, ' + ' "category": "logonly" } } }, ' + '"snmpv3_config": { ' + ' "usm_users": [ { ' + ' "user": "usr-sha-aes256", ' + ' "engineId": "8000000001020304", ' + ' "usmHMACSHAAuth": "authkey1", ' + ' "usmAesCfb256": "privkey1" }, ' + ' { "user": "user1", ' + ' "engineId": "8000000000000001", ' + ' "usmHMACMD5Auth": "authkey1", ' + ' "usmDESPriv": "privkey1" }, ' + ' { "user": "user2", ' + ' "engineId": "8000000000000002", ' + ' "usmHMACSHAAuth": "authkey2", ' + ' "usmAesCfb128": "privkey2" }, ' + ' { "user": "user3", ' + ' "engineId": "8000000000000003", ' + ' "usmHMACSHAAuth": "authkey3", ' + ' "usmAesCfb256": "privkey3" } ' + '] } }' + ) + # fmt: on + tds.json_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + "/json_traps.json" + ) + tds.arriving_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + "/arriving_traps.log" + ) - # try to roll logs when not open - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.roll_all_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_error_file(self): """ Test bad error file location """ - # open eelf error logs - tds.c_config["files.eelf_error"] = "/bad_dir/error.log" + with patch.dict(tds.c_config["files"]): + # open eelf error logs + tds.c_config["files"]["eelf_error"] = "/bad_dir/error.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_debug_file(self): """ @@ -65,86 +178,248 @@ class test_trapd_io(unittest.TestCase): """ # open eelf debug logs - tds.c_config["files.eelf_debug"] = "/bad_dir/debug.log" + with patch.dict(tds.c_config["files"]): + tds.c_config["files"]["eelf_debug"] = "/bad_dir/debug.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_audit_file(self): """ Test bad audit file location """ - # open eelf debug logs - tds.c_config["files.eelf_audit"] = "/bad_dir/audit.log" + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + tds.c_config["files"]["eelf_audit"] = "/bad_dir/audit.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_metrics_file(self): """ Test bad metrics file location """ + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + tds.c_config["files"]["eelf_metrics"] = "/bad_dir/metrics.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_error_file_missing_name(self): + """ + Test bad error file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf error logs + del tds.c_config["files"]["eelf_error"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_debug_file_missing_name(self): + """ + Test bad debug file location + """ + # open eelf debug logs - tds.c_config["files.eelf_metrics"] = "/bad_dir/metrics.log" + with patch.dict(tds.c_config["files"]): + del tds.c_config["files"]["eelf_debug"] - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - def test_roll_all_logs(self): + + def test_open_eelf_audit_file_missing_name(self): + """ + Test bad audit file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + del tds.c_config["files"]["eelf_audit"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_metrics_file_missing_name(self): + """ + Test bad metrics file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + del tds.c_config["files"]["eelf_metrics"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_roll_all_logs_not_open(self): """ Test roll of logs when not open """ - # try to roll logs when not open - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.roll_all_logs() - assert pytest_wrapped_exception.type == SystemExit + # try to roll logs when not open. Shouldn't fail + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + def test_roll_all_logs(self): + """ + Test rolling files that they are open + """ + + trapd_io.open_eelf_logs() + # try to roll logs + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_roll_file_throws(self): + """ + Test rolling files that they are open + but roll_file throws an exception + """ + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.roll_file') as roll_file_throws: + roll_file_throws.side_effect = RuntimeError("roll_file() throws") + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_eelf_logs_returns_false(self): + """ + Test rolling files that they are open + but open_eelf_logs returns false + """ + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_eelf_logs') as open_eelf_logs_throws: + open_eelf_logs_throws.return_value = False + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_file_json_traps_throws(self): + """ + Test rolling files that they are open + but open_file(json_traps_filename) throws an exception + """ + + def tmp_func(nm): + if nm == tds.json_traps_filename: + raise RuntimeError("json_traps_filename throws") + return test_trapd_io.PseudoFile() + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_file') as open_file_throws: + open_file_throws.side_effect = tmp_func + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_file_arriving_traps_throws(self): + """ + Test rolling files that they are open + but open_file(arriving_traps_filename) throws an exception + """ + + def tmp_func(nm): + if nm == tds.arriving_traps_filename: + raise RuntimeError("arriving_traps_filename throws") + return test_trapd_io.PseudoFile() + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_file') as open_file_throws: + open_file_throws.side_effect = tmp_func + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + def test_roll_file(self): """ Test roll of individual file when not present """ + # try to roll a valid log file + with tempfile.TemporaryDirectory() as ntd: + fn = ntd + "/test.log" + with open(fn, "w") as ofp: + self.assertTrue(trapd_io.roll_file(fn)) + # The file will be renamed to something like + # test.log.2022-08-17T20:28:32 + self.assertFalse(os.path.exists(fn)) + # We could also add a test to see if there is a file + # with a name like that. + files = list(glob.glob(f"{ntd}/*")) + print(f"files={files}") + self.assertEqual(len(files), 1) + self.assertTrue(files[0].startswith(fn + ".")) + + + def test_roll_file_not_present(self): + """ + Test roll of individual file when not present + """ + # try to roll logs when not open - result = trapd_io.roll_file("/file/not/present") - self.assertEqual(result, False) + self.assertFalse(trapd_io.roll_file("/file/not/present")) + def test_roll_file_no_write_perms(self): """ try to roll logs when not enough perms """ - no_perms_dir = "/tmp/opt/app/snmptrap/no_perms" - no_perms_file = "test.dat" - no_perms_fp = no_perms_dir + "/" + no_perms_file + with tempfile.TemporaryDirectory() as no_perms_dir: + # no_perms_dir = "/tmp/opt/app/snmptrap/no_perms" + no_perms_file = "test.dat" + no_perms_fp = no_perms_dir + "/" + no_perms_file - # required directory tree - try: - Path(no_perms_dir).mkdir(parents=True, exist_ok=True) - os.chmod(no_perms_dir, 0o777) - except Exception as e: - print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) - sys.exit(1) + # required directory tree + #try: + # Path(no_perms_dir).mkdir(parents=True, exist_ok=True) + # os.chmod(no_perms_dir, 0o700) + #except Exception as e: + # self.fail("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + + # create empty file + open(no_perms_fp, "w").close() + os.chmod(no_perms_dir, 0o555) - # create empty file - open(no_perms_fp, "a").close() - os.chmod(no_perms_dir, 0o444) + # try to roll file in dir with no write perms + self.assertFalse(trapd_io.roll_file(no_perms_fp)) - result = trapd_io.roll_file(no_perms_fp) - self.assertEqual(result, False) + # the file should still be there + open(no_perms_fp).close() + + # allow the directory to be removed + os.chmod(no_perms_dir, 0o700) - # try to roll file in dir with no write perms - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_file(no_perms_fp) - assert pytest_wrapped_exception.type == SystemExit def test_open_file_exists(self): """ @@ -156,8 +431,9 @@ class test_trapd_io(unittest.TestCase): # try to roll logs when not open result = trapd_io.open_file(test_file) - compare = str(result).startswith("<_io.TextIOWrapper name=") - self.assertEqual(compare, True) + self.assertTrue(str(result).startswith("<_io.TextIOWrapper name=")) + self.assertIsInstance(result, io.TextIOWrapper) + def test_open_file_exists_does_not_exist(self): """ @@ -168,9 +444,9 @@ class test_trapd_io(unittest.TestCase): test_file = "/tmp/no_such_dir/snmptrap_pytest" # try to open file when dir not present - with pytest.raises(SystemExit) as pytest_wrapped_exception: + with self.assertRaises(SystemExit): result = trapd_io.open_file(test_file) - assert pytest_wrapped_exception.type == SystemExit + def test_close_file_exists(self): """ @@ -182,18 +458,113 @@ class test_trapd_io(unittest.TestCase): test_file = trapd_io.open_file(test_file_name) # close active file - result = trapd_io.close_file(test_file, test_file_name) - self.assertEqual(result, True) + self.assertTrue(trapd_io.close_file(test_file, test_file_name)) + - def test_close_file_does_not_exists(self): + def test_close_file_does_not_exist(self): """ Test closing non-existent file """ # try to roll logs when not open - result = trapd_io.close_file(None, None) - self.assertEqual(result, False) + self.assertFalse(trapd_io.close_file(None, None)) + + + def test_ecomp_logger_type_error(self): + """ + test trapd_io.ecomp_logger + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_error_bad_fd(self): + """ + test trapd_io.ecomp_logger, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_error_fd = tds.eelf_error_fd + tds.eelf_error_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_error_fd = sv_eelf_error_fd + + + def test_ecomp_logger_type_unknown_bad_fd(self): + """ + test trapd_io.ecomp_logger, unknown type, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_error_fd = tds.eelf_error_fd + tds.eelf_error_fd = test_trapd_io.WriteThrows() + self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_error_fd = sv_eelf_error_fd + + + def test_ecomp_logger_type_metrics(self): + """ + test trapd_io.ecomp_logger to metrics + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_metrics_bad_fd(self): + """ + test trapd_io.ecomp_logger to metrics, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_metrics_fd = tds.eelf_metrics_fd + tds.eelf_metrics_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_metrics_fd = sv_eelf_metrics_fd + + + def test_ecomp_logger_type_audit(self): + """ + test trapd_io.ecomp_logger to audit log + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_audit_bad_fd(self): + """ + test trapd_io.ecomp_logger to audit log, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_audit_fd = tds.eelf_audit_fd + tds.eelf_audit_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_audit_fd = sv_eelf_audit_fd + + + def test_ecomp_logger_type_unknown(self): + """ + test trapd_io.ecomp_logger + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_runtime_pid.py b/tests/test_trapd_runtime_pid.py index c6b8601..923f730 100644 --- a/tests/test_trapd_runtime_pid.py +++ b/tests/test_trapd_runtime_pid.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,24 +14,30 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest +import os import unittest -import trapd_runtime_pid +from unittest.mock import patch + import trapd_io +import trapd_runtime_pid -class test_save_pid(unittest.TestCase): +class test_trapd_runtime_pid(unittest.TestCase): """ Test the save_pid mod """ + GOOD_PID_FILE = "/tmp/snmptrap_test_pid_file" + BAD_PID_FILE = "/tmp/snmptrap_test_pid_file_not_there" + def test_correct_usage(self): """ Test that attempt to create pid file in standard location works """ - result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file") + result = trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE) self.assertEqual(result, True) + def test_missing_directory(self): """ Test that attempt to create pid file in missing dir fails @@ -40,7 +46,6 @@ class test_save_pid(unittest.TestCase): self.assertEqual(result, False) -class test_rm_pid(unittest.TestCase): """ Test the rm_pid mod """ @@ -50,18 +55,26 @@ class test_rm_pid(unittest.TestCase): Test that attempt to remove pid file in standard location works """ # must create it before removing it - result = trapd_runtime_pid.save_pid("/tmp/snmptrap_test_pid_file") - self.assertEqual(result, True) - result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file") - self.assertEqual(result, True) + self.assertTrue(trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE)) + self.assertTrue(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.GOOD_PID_FILE)) + def test_missing_file(self): """ Test that attempt to rm non-existent pid file fails """ - result = trapd_runtime_pid.rm_pid("/tmp/snmptrap_test_pid_file_9999") - self.assertEqual(result, False) + self.assertFalse(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.BAD_PID_FILE)) + + + def test_correct_usage_but_throws(self): + """ + Test that an exception while removing returns false + """ + self.assertTrue(trapd_runtime_pid.save_pid(test_trapd_runtime_pid.GOOD_PID_FILE)) + with patch('os.remove') as mock_remove: + mock_remove.side_effect = RuntimeError("os.remove throws") + self.assertFalse(trapd_runtime_pid.rm_pid(test_trapd_runtime_pid.GOOD_PID_FILE)) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_settings.py b/tests/test_trapd_settings.py index 92a3144..5625d78 100644 --- a/tests/test_trapd_settings.py +++ b/tests/test_trapd_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit @@ -24,66 +23,43 @@ pid_file_dne = "/tmp/test_pid_file_NOT" import trapd_settings as tds -class test_cleanup_and_exit(unittest.TestCase): +class test_trapd_settings(unittest.TestCase): """ Test for presense of required vars """ + @classmethod + def setUpClass(cls): + tds.init() + + def test_nonexistent_dict(self): """ Test nosuch var """ - tds.init() - try: - tds.no_such_var - result = True - except: - result = False + self.assertFalse(hasattr(tds, 'no_such_var')) - self.assertEqual(result, False) def test_config_dict(self): """ Test config dict """ - tds.init() - try: - tds.c_config - result = True - except: - result = False + self.assertTrue(hasattr(tds, 'c_config')) - self.assertEqual(result, True) def test_dns_cache_ip_to_name(self): """ Test dns cache name dict """ + self.assertTrue(hasattr(tds, 'dns_cache_ip_to_name')) - tds.init() - try: - tds.dns_cache_ip_to_name - result = True - except: - result = False - - self.assertEqual(result, True) def test_dns_cache_ip_expires(self): """ Test dns cache ip expires dict """ - - tds.init() - try: - tds.dns_cache_ip_expires - result = True - except: - result = False - - self.assertEqual(result, True) + self.assertTrue(hasattr(tds, 'dns_cache_ip_expires')) -if __name__ == "__main__": - # tds.init() +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_snmpv3.py b/tests/test_trapd_snmpv3.py index eac6082..2011953 100644 --- a/tests/test_trapd_snmpv3.py +++ b/tests/test_trapd_snmpv3.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,10 +14,10 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import json import unittest import os +import traceback from onap_dcae_cbs_docker_client.client import get_config from trapd_exit import cleanup_and_exit @@ -33,48 +33,459 @@ class test_snmpv3_config(unittest.TestCase): Test snmpv3 module """ + JSON_START = ( + '{' + ' "snmptrapd": {' + ' "version": "2.0",' + ' "title": "ONAP SNMP Trap Receiver"' + ' },' + ' "protocols": {' + ' "transport": "udp",' + ' "ipv4_interface": "0.0.0.0",' + ' "ipv4_port": 6162,' + ' "ipv6_interface": "::1",' + ' "ipv6_port": 6162' + ' },' + ' "cache": {' + ' "dns_cache_ttl_seconds": 10800' + ' },' + ' "publisher": {' + ' "http_milliseconds_timeout": 500,' + ' "http_retries": 2,' + ' "http_milliseconds_between_retries": 250,' + ' "http_primary_publisher": "true",' + ' "http_peer_publisher": "unavailable",' + ' "max_traps_between_publishes": 10,' + ' "max_milliseconds_between_publishes": 10000' + ' },' + ' "streams_publishes": {' + ' "sec_fault_unsecure": {' + ' "type": "message_router",' + ' "aaf_password": null,' + ' "dmaap_info": {' + ' "location": "mtl5",' + ' "client_id": null,' + ' "client_role": null,' + ' "topic_url": "http://localhost:3904/events/unauthenticated.ONAP-COLLECTOR-SNMPTRAP"' + ' },' + ' "aaf_username": null' + ' }' + ' },' + ' "files": {' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap",' + ' "log_dir": "logs",' + ' "data_dir": "data",' + ' "pid_dir": "tmp",' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log",' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log",' + ' "traps_stats_log": "snmptrapd_stats.csv",' + ' "perm_status_file": "snmptrapd_status.log",' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs",' + ' "eelf_error": "error.log",' + ' "eelf_debug": "debug.log",' + ' "eelf_audit": "audit.log",' + ' "eelf_metrics": "metrics.log",' + ' "roll_frequency": "hour",' + ' "minimum_severity_to_log": 3' + ' },' + ' "check_hb_traps": {' + ' "trap_thr": 900,' + ' "hb_thr": 900,' + ' "hb_notify_oid": ".1.3.6.1.4.1.74.2.46.12.1.1"' + ' },' + ' "trap_config": {' + ' "sw_interval_in_seconds": 60,' + ' "metric_log_notification_threshold_pct": 25,' + ' "notify_oids": [' + ' {' + ' "oid": ".1.3.6.1.4.1.9.0.1",' + ' "sw_high_water_in_interval": 100,' + ' "sw_low_water_in_interval": 5,' + ' "category": "logonly"' + ' },' + ' {' + ' "oid": ".1.3.6.1.4.1.9.0.2",' + ' "sw_high_water_in_interval": 200,' + ' "sw_low_water_in_interval": 10,' + ' "category": "logonly"' + ' },' + ' {' + ' "oid": ".1.3.6.1.4.1.9.0.3",' + ' "sw_high_water_in_interval": 300,' + ' "sw_low_water_in_interval": 15,' + ' "category": "logonly"' + ' }' + ' ]' + ' }' + ) + JSON_USERS = ( + ' "snmpv3_config": {' + ' "usm_users": [' + ' {' + ' "user": "user1",' + ' "engineId": "8000000000000001",' + ' "usmHMACMD5AuthProtocol": "authkey1",' + ' "usmDESPrivProtocol": "privkey1"' + ' },' + ' {' + ' "user": "user2",' + ' "engineId": "8000000000000002",' + ' "usmHMACMD5AuthProtocol": "authkey2",' + ' "usm3DESEDEPrivProtocol": "privkey2"' + ' },' + ' {' + ' "user": "user3",' + ' "engineId": "8000000000000003",' + ' "usmHMACMD5AuthProtocol": "authkey3",' + ' "usmAesCfb128Protocol": "privkey3"' + ' },' + ' {' + ' "user": "user4",' + ' "engineId": "8000000000000004",' + ' "usmHMACMD5AuthProtocol": "authkey4",' + ' "usmAesBlumenthalCfb192Protocol": "privkey4"' + ' },' + ' {' + ' "user": "user5",' + ' "engineId": "8000000000000005",' + ' "usmHMACMD5AuthProtocol": "authkey5",' + ' "usmAesBlumenthalCfb256Protocol": "privkey5"' + ' },' + ' {' + ' "user": "user6",' + ' "engineId": "8000000000000006",' + ' "usmHMACMD5AuthProtocol": "authkey6",' + ' "usmAesCfb192Protocol": "privkey6"' + ' },' + ' {' + ' "user": "user7",' + ' "engineId": "8000000000000007",' + ' "usmHMACMD5AuthProtocol": "authkey7",' + ' "usmAesCfb256Protocol": "privkey7"' + ' },' + ' {' + ' "user": "user9",' + ' "engineId": "8000000000000009",' + ' "usmHMACSHAAuthProtocol": "authkey9",' + ' "usmDESPrivProtocol": "privkey9"' + ' },' + ' {' + ' "user": "user10",' + ' "engineId": "8000000000000010",' + ' "usmHMACSHAAuthProtocol": "authkey10",' + ' "usm3DESEDEPrivProtocol": "privkey10"' + ' },' + ' {' + ' "user": "user11",' + ' "engineId": "8000000000000011",' + ' "usmHMACSHAAuthProtocol": "authkey11",' + ' "usmAesCfb128Protocol": "privkey11"' + ' },' + ' {' + ' "user": "user12",' + ' "engineId": "8000000000000012",' + ' "usmHMACSHAAuthProtocol": "authkey12",' + ' "usmAesBlumenthalCfb192Protocol": "privkey12"' + ' },' + ' {' + ' "user": "user13",' + ' "engineId": "8000000000000013",' + ' "usmHMACSHAAuthProtocol": "authkey13",' + ' "usmAesBlumenthalCfb256Protocol": "privkey13"' + ' },' + ' {' + ' "user": "user14",' + ' "engineId": "8000000000000014",' + ' "usmHMACSHAAuthProtocol": "authkey14",' + ' "usmAesCfb192Protocol": "privkey14"' + ' },' + ' {' + ' "user": "user15",' + ' "engineId": "8000000000000015",' + ' "usmHMACSHAAuthProtocol": "authkey15",' + ' "usmAesCfb256Protocol": "privkey15"' + ' },' + ' {' + ' "user": "user17",' + ' "engineId": "8000000000000017",' + ' "usmHMAC128SHA224AuthProtocol": "authkey17",' + ' "usmDESPrivProtocol": "privkey17"' + ' },' + ' {' + ' "user": "user18",' + ' "engineId": "8000000000000018",' + ' "usmHMAC128SHA224AuthProtocol": "authkey18",' + ' "usm3DESEDEPrivProtocol": "privkey18"' + ' },' + ' {' + ' "user": "user19",' + ' "engineId": "8000000000000019",' + ' "usmHMAC128SHA224AuthProtocol": "authkey19",' + ' "usmAesCfb128Protocol": "privkey19"' + ' },' + ' {' + ' "user": "user20",' + ' "engineId": "8000000000000020",' + ' "usmHMAC128SHA224AuthProtocol": "authkey20",' + ' "usmAesBlumenthalCfb192Protocol": "privkey20"' + ' },' + ' {' + ' "user": "user21",' + ' "engineId": "8000000000000021",' + ' "usmHMAC128SHA224AuthProtocol": "authkey21",' + ' "usmAesBlumenthalCfb256Protocol": "privkey21"' + ' },' + ' {' + ' "user": "user22",' + ' "engineId": "8000000000000022",' + ' "usmHMAC128SHA224AuthProtocol": "authkey22",' + ' "usmAesCfb192Protocol": "privkey22"' + ' },' + ' {' + ' "user": "user23",' + ' "engineId": "8000000000000023",' + ' "usmHMAC128SHA224AuthProtocol": "authkey23",' + ' "usmAesCfb256Protocol": "privkey23"' + ' },' + ' {' + ' "user": "user25",' + ' "engineId": "8000000000000025",' + ' "usmHMAC192SHA256AuthProtocol": "authkey25",' + ' "usmDESPrivProtocol": "privkey25"' + ' },' + ' {' + ' "user": "user26",' + ' "engineId": "8000000000000026",' + ' "usmHMAC192SHA256AuthProtocol": "authkey26",' + ' "usm3DESEDEPrivProtocol": "privkey26"' + ' },' + ' {' + ' "user": "user27",' + ' "engineId": "8000000000000027",' + ' "usmHMAC192SHA256AuthProtocol": "authkey27",' + ' "usmAesCfb128Protocol": "privkey27"' + ' },' + ' {' + ' "user": "user28",' + ' "engineId": "8000000000000028",' + ' "usmHMAC192SHA256AuthProtocol": "authkey28",' + ' "usmAesBlumenthalCfb192Protocol": "privkey28"' + ' },' + ' {' + ' "user": "user29",' + ' "engineId": "8000000000000029",' + ' "usmHMAC192SHA256AuthProtocol": "authkey29",' + ' "usmAesBlumenthalCfb256Protocol": "privkey29"' + ' },' + ' {' + ' "user": "user30",' + ' "engineId": "8000000000000030",' + ' "usmHMAC192SHA256AuthProtocol": "authkey30",' + ' "usmAesCfb192Protocol": "privkey30"' + ' },' + ' {' + ' "user": "user31",' + ' "engineId": "8000000000000031",' + ' "usmHMAC192SHA256AuthProtocol": "authkey31",' + ' "usmAesCfb256Protocol": "privkey31"' + ' },' + ' {' + ' "user": "user33",' + ' "engineId": "8000000000000033",' + ' "usmHMAC256SHA384AuthProtocol": "authkey33",' + ' "usmDESPrivProtocol": "privkey33"' + ' },' + ' {' + ' "user": "user34",' + ' "engineId": "8000000000000034",' + ' "usmHMAC256SHA384AuthProtocol": "authkey34",' + ' "usm3DESEDEPrivProtocol": "privkey34"' + ' },' + ' {' + ' "user": "user35",' + ' "engineId": "8000000000000035",' + ' "usmHMAC256SHA384AuthProtocol": "authkey35",' + ' "usmAesCfb128Protocol": "privkey35"' + ' },' + ' {' + ' "user": "user36",' + ' "engineId": "8000000000000036",' + ' "usmHMAC256SHA384AuthProtocol": "authkey36",' + ' "usmAesBlumenthalCfb192Protocol": "privkey36"' + ' },' + ' {' + ' "user": "user37",' + ' "engineId": "8000000000000037",' + ' "usmHMAC256SHA384AuthProtocol": "authkey37",' + ' "usmAesBlumenthalCfb256Protocol": "privkey37"' + ' },' + ' {' + ' "user": "user38",' + ' "engineId": "8000000000000038",' + ' "usmHMAC256SHA384AuthProtocol": "authkey38",' + ' "usmAesCfb192Protocol": "privkey38"' + ' },' + ' {' + ' "user": "user39",' + ' "engineId": "8000000000000039",' + ' "usmHMAC256SHA384AuthProtocol": "authkey39",' + ' "usmAesCfb256Protocol": "privkey39"' + ' },' + ' {' + ' "user": "user41",' + ' "engineId": "8000000000000041",' + ' "usmHMAC384SHA512AuthProtocol": "authkey41",' + ' "usmDESPrivProtocol": "privkey41"' + ' },' + ' {' + ' "user": "user42",' + ' "engineId": "8000000000000042",' + ' "usmHMAC384SHA512AuthProtocol": "authkey42",' + ' "usm3DESEDEPrivProtocol": "privkey42"' + ' },' + ' {' + ' "user": "user43",' + ' "engineId": "8000000000000043",' + ' "usmHMAC384SHA512AuthProtocol": "authkey43",' + ' "usmAesCfb128Protocol": "privkey43"' + ' },' + ' {' + ' "user": "user44",' + ' "engineId": "8000000000000044",' + ' "usmHMAC384SHA512AuthProtocol": "authkey44",' + ' "usmAesBlumenthalCfb192Protocol": "privkey44"' + ' },' + ' {' + ' "user": "user45",' + ' "engineId": "8000000000000045",' + ' "usmHMAC384SHA512AuthProtocol": "authkey45",' + ' "usmAesBlumenthalCfb256Protocol": "privkey45"' + ' },' + ' {' + ' "user": "user46",' + ' "engineId": "8000000000000046",' + ' "usmHMAC384SHA512AuthProtocol": "authkey46",' + ' "usmAesCfb192Protocol": "privkey46"' + ' },' + ' {' + ' "user": "user47",' + ' "engineId": "8000000000000047",' + ' "usmHMAC384SHA512AuthProtocol": "authkey47",' + ' "usmAesCfb256Protocol": "privkey47"' + ' },' + ' {' + ' "user": "user48",' + ' "engineId": "8000000000000048",' + ' "usmNoAuthProtocol": "authkey48",' + ' "usmNoPrivProtocol": "privkey48"' + ' },' + ' {' + ' "user": "user49",' + ' "engineId": "8000000000000049",' + ' "unknownAuthProtocol": "authkey49",' + ' "unknownProtocol": "privkey49"' + ' }' + ' ]' + ' }' + ) + JSON_MISSING_USER = ( + ' "snmpv3_config": {' + ' "usm_users": [' + ' {' + ' "baduser": "user50",' + ' "engineId": "8000000000000050",' + ' "unknownAuthProtocol": "authkey50",' + ' "unknownProtocol": "privkey50"' + ' }' + ' ]' + ' }' + ) + JSON_MISSING_ENGINE = ( + ' "snmpv3_config": {' + ' "usm_users": [' + ' {' + ' "user": "user51",' + ' "badengineId": "8000000000000051",' + ' "unknownAuthProtocol": "authkey51",' + ' "unknownProtocol": "privkey51"' + ' }' + ' ]' + ' }' + ) + JSON_COMMA = ',' + JSON_END = '}' + + @classmethod + def setUpClass(cls): + tds.init() + + def test_v3_config_present(self): """ Test that snmpv3 config is present """ - tds.c_config = json.loads( - '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" }, { "user": "user4"} , { "engineId": "1"}, { "user": "user6", "engineId": "8000000000000006", "usmNoAuth": "authkey3", "usmAesCfb192": "privkey6" }, { "user": "user7", "engineId": "8000000000000007", "usmNoAuth": "", "usmNoPriv": "" }] } }' + pconfig = ( + test_snmpv3_config.JSON_START + + test_snmpv3_config.JSON_COMMA + + test_snmpv3_config.JSON_USERS + + test_snmpv3_config.JSON_END ) + tds.c_config = json.loads(pconfig) - # del os.environ['CBS_SIM_JSON'] - # result = trapd_get_cbs_config.get_cbs_config() - # print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, False) + snmp_engine = engine.SnmpEngine() + rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) + self.assertEqual(rsnmp_engine, snmp_engine) - with pytest.raises(Exception) as pytest_wrapped_sys_exit: - snmp_engine = engine.SnmpEngine() - result = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) - # config, snmp_engine=trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, pytest_json_data_with_v3) - assert pytest_wrapped_sys_exit.type == SystemExit - # assert pytest_wrapped_sys_exit.value.code == 1 def test_v3_config_not_present(self): """ Test that app is ok if v3 config not present """ - tds.c_config = json.loads( - '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } } }' - ) + pconfig = ( + test_snmpv3_config.JSON_START + + test_snmpv3_config.JSON_END + ) + tds.c_config = json.loads(pconfig) - # del os.environ['CBS_SIM_JSON'] - # result = trapd_get_cbs_config.get_cbs_config() - # print("result: %s" % result) - # compare = str(result).startswith("{'snmptrap': ") - # self.assertEqual(compare, False) + snmp_engine = engine.SnmpEngine() + rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) + self.assertEqual(rsnmp_engine, snmp_engine) + + + @unittest.skip("need to understand what happens when a username is missing") + def test_v3_config_missing_user(self): + """ + Test that app is ok if v3 config has a missing user name + """ + pconfig = ( + test_snmpv3_config.JSON_START + + test_snmpv3_config.JSON_COMMA + + test_snmpv3_config.JSON_MISSING_USER + + test_snmpv3_config.JSON_END + ) + tds.c_config = json.loads(pconfig) + + snmp_engine = engine.SnmpEngine() + rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) + self.assertEqual(rsnmp_engine, snmp_engine) + + + def test_v3_config_missing_engine(self): + """ + Test that app is ok if v3 config has a missing engine name + """ + pconfig = ( + test_snmpv3_config.JSON_START + + test_snmpv3_config.JSON_COMMA + + test_snmpv3_config.JSON_MISSING_ENGINE + + test_snmpv3_config.JSON_END + ) + tds.c_config = json.loads(pconfig) - with pytest.raises(Exception) as pytest_wrapped_sys_exit: - snmp_engine = engine.SnmpEngine() - result = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) - # config, snmp_engine=trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, pytest_json_data_with_v3) - assert pytest_wrapped_sys_exit.type == SystemExit - # assert pytest_wrapped_sys_exit.value.code == 1 + snmp_engine = engine.SnmpEngine() + rconfig, rsnmp_engine = trapd_snmpv3.load_snmpv3_credentials(config, snmp_engine, tds.c_config) + self.assertEqual(rsnmp_engine, snmp_engine) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py index 16b0a17..3041884 100644 --- a/tests/test_trapd_stormwatch.py +++ b/tests/test_trapd_stormwatch.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,14 +14,15 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit import time +from unittest.mock import patch import trapd_stormwatch as sw import trapd_stormwatch_settings as sws import trapd_stats_settings as stats +import trapd_settings as tds class test_cleanup_and_exit(unittest.TestCase): @@ -29,84 +30,182 @@ class test_cleanup_and_exit(unittest.TestCase): Test for presense of required vars """ - def test_increment_existing_counter(self): - """ - Test increment counter - """ - sw.sw_init() + @classmethod + def setUp(cls): + tds.init() stats.init() + sw.sw_init() + sws.init() - oid = ".1.2.3.4.5.6" - sws.sw_config_oid_dict[oid] = True - sws.sw_config_low_water_in_interval_dict[oid] = 1 - sws.sw_config_high_water_in_interval_dict[oid] = 10 - - try: - sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6") - result = True - except: - result = False - - self.assertEqual(result, True) - - # try again, but with stats.total_notifications removed - delattr(stats, "total_notifications") - try: - sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6") - result = True - except: - result = False + def test_sw_init(self): + """ test sw_init() """ + sw.sw_init() + self.assertEqual(sws.sw_interval_in_seconds, 60) + sws.init() + self.assertEqual(sws.sw_config_category, {}) - self.assertEqual(result, True) def test_sw_clear_dicts(self): """ Test sw_clear_dicts """ - sw.sw_init() - # initialize attributes not handled by sw_init() - sws.sw_storm_counter_dict = {} - stats.agent_counter_dict = {} - stats.oid_counter_dict = {} - sws.sw_config_category = {} - # provide a value that can tested for - sws.sw_storm_counter_dict["abc"] = "def" + with patch.dict('trapd_stormwatch_settings.sw_storm_counter_dict'): + with patch.dict('trapd_stormwatch_settings.sw_config_category'): + with patch.dict('trapd_stats_settings.agent_counter_dict'): + with patch.dict('trapd_stats_settings.oid_counter_dict'): + + # initialize attributes not handled by sw_init() + sws.sw_storm_counter_dict = {} + sws.sw_config_category = {} + stats.agent_counter_dict = {} + stats.oid_counter_dict = {} + + # provide a value that can tested for + sws.sw_storm_counter_dict["abc"] = "def" + self.assertTrue("abc" in sws.sw_storm_counter_dict) + + self.assertTrue(sw.sw_clear_dicts()) + self.assertFalse("abc" in sws.sw_storm_counter_dict) - sw.sw_clear_dicts() - self.assertFalse("abc" in sws.sw_storm_counter_dict) + # now make sure we get an exception + sws.sw_config_category = 3 + self.assertFalse(sw.sw_clear_dicts()) - # now make sure we get an exception - sws.sw_config_category = 3 - self.assertFalse(sw.sw_clear_dicts()) - # clean up the attributes we added above - delattr(sws, "sw_storm_counter_dict") - delattr(stats, "agent_counter_dict") - delattr(stats, "oid_counter_dict") - delattr(sws, "sw_config_category") + def test_sw_load_trap_config(self): + """ Test sw_load_trap_config(_config) """ + trap_dict_info = { + "uuid": "06f6e91c-3236-11e8-9953-005056865aac", + "agent address": "1.2.3.4", + "agent name": "test-agent.nodomain.com", + "cambria.partition": "test-agent.nodomain.com", + "community": "", + "community len": 0, + "epoch_serno": 15222068260000, + "protocol version": "v2c", + "time received": 1522206826.2938566, + "trap category": "ONAP-COLLECTOR-SNMPTRAP", + "sysUptime": "218567736", + "notify OID": "1.3.6.1.4.1.9999.9.9.999", + "trap_config": { }, + "notify OID len": 10, + } + + # normal operation, and variations + ret1 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret1, 0) + self.assertIsInstance(ret1, int) + + trap_dict_info["trap_config"]["notify_oids"] = [ + { "oidx": "1.3.6.1.4.1.9999.9.9.888" } + ] + ret2 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret2, 0) + self.assertIsInstance(ret2, int) + + trap_dict_info["trap_config"]["metric_log_notification_threshold_pct"] = 33 + ret3 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret3, 0) + self.assertIsInstance(ret3, int) + + trap_dict_info["trap_config"]["sw_interval_in_seconds"] = 50 + ret4 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret4, 0) + self.assertIsInstance(ret4, int) + + trap_dict_info["trap_config"]["notify_oids"] = [ + { + "oid": "1.3.6.1.4.1.9999.9.9.888", + "sw_high_water_in_interval": 3, + "sw_low_water_in_interval": 2, + "category": "abc", + } + ] + ret5 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret5, 1) + self.assertIsInstance(ret5, int) + + delattr(sws, 'sw_storm_active_dict') + ret6 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret6, 1) + self.assertIsInstance(ret6, int) + def test_sw_log_metrics(self): """ Test sw_clear_log_metrics """ - sw.sw_init() - stats.total_notifications = 3 stats.total_notifications = 50 sws.sw_interval_in_seconds = 30 stats.agent_counter_dict = {"a": 3, "b": 40} stats.metric_log_notification_threshold_pct = 30 - sw.sw_log_metrics() - # make sure we got this far - assert True + with patch('trapd_stormwatch.ecomp_logger') as magic_ecomp_logger: + sw.sw_log_metrics() + self.assertEqual(magic_ecomp_logger.call_count, 3) + + + def test_increment_existing_counter(self): + """ + Test increment counter. There should NOT be an exception. + """ + oid = ".1.2.3.4.5.6" + sws.sw_config_oid_dict[oid] = True + sws.sw_config_low_water_in_interval_dict[oid] = 1 + sws.sw_config_high_water_in_interval_dict[oid] = 10 + + loc_agent = "192.168.1.1" + stats.agent_counter_dict[loc_agent] = 2 + stats.oid_counter_dict[oid] = 102 + + sv_total_notifications = stats.total_notifications + sv_agent_count_dict = stats.agent_counter_dict[loc_agent] + sv_oid_count_dict = stats.oid_counter_dict[oid] + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except Exception as e: + result = False + self.assertEqual(result, True, "test_increment_existing_counter") + self.assertEqual(stats.total_notifications, sv_total_notifications+1) + self.assertEqual(stats.agent_counter_dict[loc_agent], sv_agent_count_dict+1) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+1) + + # try again, without agent_counter_dict[loc_agent] + del stats.agent_counter_dict[loc_agent] + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except Exception as e: + result = False + self.assertEqual(result, True, "test_increment_existing_counter") + self.assertEqual(stats.total_notifications, sv_total_notifications+2) + self.assertEqual(stats.agent_counter_dict[loc_agent], 1) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+2) + + # try again, but with stats.total_notifications removed + delattr(stats, "total_notifications") + + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except: + result = False + + self.assertEqual(result, True, "stats.total_notifications removed") + self.assertEqual(stats.total_notifications, 1) + self.assertEqual(stats.agent_counter_dict[loc_agent], 2) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+3) + def test_sw_storm_active(self): """ Test sw_storm_active() """ - sw.sw_init() + print(f"sw_last_stormwatch_dict_analysis={sws.sw_last_stormwatch_dict_analysis}") + # initialize attributes not handled by sw_init() stats.oid_counter_dict = {} @@ -154,7 +253,9 @@ class test_cleanup_and_exit(unittest.TestCase): self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) # now force sws.sw_last_stormwatch_dict_analysis to an old value - sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20 + sws.sw_last_stormwatch_dict_analysis = ( + int(time.time()) - sws.sw_interval_in_seconds - 20 + ) # and make certain that stats.oid_counter_dict got cleared. if not hasattr(stats, "oid_counter_dict"): stats.oid_counter_dict = {} @@ -164,6 +265,47 @@ class test_cleanup_and_exit(unittest.TestCase): # .get("abc") != None) -if __name__ == "__main__": + @patch('trapd_stormwatch.ecomp_logger') + def test_sw_reset_counter_dict(self, magic_ecomp_logger): + """ test sw_reset_counter_dict() """ + + loc_agent = "192.168.1.1" + loc_oid = ".1.2.3.4.5.6" + loc_agent_oid = loc_agent + " " + loc_oid + + self.assertTrue(sw.sw_reset_counter_dict()) + self.assertEqual(magic_ecomp_logger.call_count, 2) + + # cases: + # for lao in storm_active_dict: + # storm_counter_dict[lao] >= high_water_val[lo] + # storm_counter_dict[lao] < high_water_val[lo] + # storm_counter_dict[lao] < low_water_val[lo] + # storm_counter_dict[lao] >= low_water_val[lo] + + with patch.dict(sws.sw_storm_counter_dict, { + loc_agent_oid: 20 + }): + # values around the 20 above + for high_water_mark in [2, 60]: + # values around the 20 above + for low_water_mark in [0, 30]: + with patch.dict(sws.sw_config_high_water_in_interval_dict, { + loc_oid: high_water_mark + }): + with patch.dict(sws.sw_config_low_water_in_interval_dict, { + loc_oid: low_water_mark + }): + with patch.dict(sws.sw_storm_active_dict, { + loc_agent_oid: "anything" + }): + sv_storm_counter = sws.sw_storm_counter_dict[loc_agent_oid] + magic_ecomp_logger.call_count = 0 + self.assertTrue(sw.sw_reset_counter_dict()) + self.assertEqual(magic_ecomp_logger.call_count, 3) + self.assertEqual(sws.sw_storm_counter_dict[loc_agent_oid], 0) + + +if __name__ == "__main__": # pragma: no cover # sws.init() unittest.main() diff --git a/tests/test_trapd_stormwatch_settings.py b/tests/test_trapd_stormwatch_settings.py index bcb04a7..f5ad9a7 100644 --- a/tests/test_trapd_stormwatch_settings.py +++ b/tests/test_trapd_stormwatch_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit @@ -34,26 +33,16 @@ class test_cleanup_and_exit(unittest.TestCase): Test nosuch var """ sws.init() - try: - sws.no_such_var - result = True - except: - result = False + self.assertFalse(hasattr(sws, 'no_such_var')) - self.assertEqual(result, False) def test_storm_counter_dict(self): """ Test storm_counter_dict """ sws.init() - try: - sws.sw_storm_counter_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_storm_counter_dict')) - self.assertEqual(result, True) def test_storm_active_dict(self): """ @@ -61,13 +50,8 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_storm_active_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_storm_active_dict')) - self.assertEqual(result, True) def test_sw_config_oid_dict(self): """ @@ -75,13 +59,8 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_config_oid_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_config_oid_dict')) - self.assertEqual(result, True) def test_sw_config_low_water_in_interval_dict(self): """ @@ -89,13 +68,8 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_config_low_water_in_interval_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_config_low_water_in_interval_dict')) - self.assertEqual(result, True) def test_sw_config_high_water_in_interval_dict(self): """ @@ -103,13 +77,8 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_config_high_water_in_interval_dict - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_config_high_water_in_interval_dict')) - self.assertEqual(result, True) def test_sw_config_category(self): """ @@ -117,43 +86,26 @@ class test_cleanup_and_exit(unittest.TestCase): """ sws.init() - try: - sws.sw_config_category - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_config_category')) - self.assertEqual(result, True) def test_sw_interval_in_seconds(self): """ Test sw_interval """ - sws.init() - try: - str(sws.sw_interval_in_seconds).isnumeric() - result = True - except: - result = False + self.assertTrue(hasattr(sws, 'sw_interval_in_seconds')) + self.assertTrue(str(sws.sw_interval_in_seconds).isnumeric()) - self.assertEqual(result, True) def test_sw_last_stormwatch_dict_analysis(self): """ Test last_stormwatch_dict_analysis """ - sws.init() - try: - str(sws.sw_last_stormwatch_dict_analysis).isnumeric() - result = True - except: - result = False - - self.assertEqual(result, True) + self.assertTrue(hasattr(sws, 'sw_last_stormwatch_dict_analysis')) + self.assertTrue(str(sws.sw_last_stormwatch_dict_analysis).isnumeric()) -if __name__ == "__main__": - # sws.init() - unittest.main() +if __name__ == "__main__": # pragma: no cover + unittest.main(verbosity=2) diff --git a/tests/test_trapd_vb_types.py b/tests/test_trapd_vb_types.py index 5792b8c..28150c9 100644 --- a/tests/test_trapd_vb_types.py +++ b/tests/test_trapd_vb_types.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,18 +14,10 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest -import json import unittest -import os -from onap_dcae_cbs_docker_client.client import get_config -from trapd_exit import cleanup_and_exit -from trapd_io import stdout_logger, ecomp_logger -import trapd_settings as tds import trapd_vb_types - -from pysnmp.entity import engine, config +import trapd_settings as tds class test_trapd_vb_types(unittest.TestCase): @@ -33,121 +25,102 @@ class test_trapd_vb_types(unittest.TestCase): Test snmpv3 module """ - good_varbind_types = [ - "Integer", - "Unsigned32", - "Counter32", - "OctetString", - "ObjectIdentifier", - "TimeTicks", - "IpAddress", - ] + @classmethod + def setUpClass(cls): + tds.init() - def trapd_vb_type_conversion_integer(self): + + def test_trapd_vb_type_conversion_integer32(self): """ Test that pysnmp varbind types Integer converts """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer32"), "integer") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer32") - self.assertEqual(result, "integer") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_integer(self): """ Test that pysnmp varbind types Integer converts """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer"), "integer") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Integer") - self.assertEqual(result, "integer") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_gauge32(self): """ Test that pysnmp varbind types Integer converts """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Gauge32"), "unsigned") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Gauge32") - self.assertEqual(result, "unsigned") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_counter32(self): """ Test that pysnmp varbind types Integer converts """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Counter32"), "counter32") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Counter32") - self.assertEqual(result, "counter32") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_octetstring(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("OctetString"), "octet") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("OctetString") - self.assertEqual(result, "octet") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_py_type_5(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_5"), "hex") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_5") - self.assertEqual(result, "hex") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_py_type_6(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_6"), "decimal") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("py_type_6") - self.assertEqual(result, "decimal") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_null(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Null"), "null") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Null") - self.assertEqual(result, "null") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_objectidentifier(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("ObjectIdentifier"), "oid") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("ObjectIdentifier") - self.assertEqual(result, "oid") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_timeticks(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("TimeTicks"), "timeticks") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("TimeTicks") - self.assertEqual(result, "timeticks") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_ipaddress(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("IpAddress"), "ipaddress") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("IpAddress") - self.assertEqual(result, "ipaddress") - def trapd_vb_type_conversion_integer(self): + def test_trapd_vb_type_conversion_bits(self): """ Test that pysnmp varbind types convert accurately """ + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Bits"), "bits") - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("Bits") - self.assertEqual(result, "bits") - def trapd_vb_type_conversion_invalid(self): + def test_trapd_vb_type_conversion_invalid(self): """ Test that pysnmp varbind types convert accurately """ - - result = trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("noSuchVarbindType") # should return default of octet if not defined - self.assertEqual(result, "octet") + self.assertEqual(trapd_vb_types.pysnmp_to_netsnmp_varbind_convert("noSuchVarbindType"), "octet") -if __name__ == "__main__": - unittest.main() +if __name__ == "__main__": # pragma: no cover + unittest.main(verbosity=2) |