diff options
Diffstat (limited to 'tests/test_snmptrapd.py')
-rw-r--r-- | tests/test_snmptrapd.py | 610 |
1 files changed, 505 insertions, 105 deletions
diff --git a/tests/test_snmptrapd.py b/tests/test_snmptrapd.py index dee2aa0..736e114 100644 --- a/tests/test_snmptrapd.py +++ b/tests/test_snmptrapd.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,17 @@ # limitations under the License. # ============LICENSE_END========================================================= +import copy +import datetime import os -import pytest import unittest +from pathlib import Path +import time +from unittest.mock import patch, Mock + +import requests + import snmptrapd -import datetime import trapd_settings as tds import trapd_stormwatch_settings as sws @@ -37,40 +43,207 @@ class test_snmptrapd(unittest.TestCase): Test the save_pid mod """ - pytest_json_data = '{ "snmptrapd": { "version": "2.0.3", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' + class WriteThrows(): + def write(): + raise RuntimeError("write() throws") + + + @classmethod + def setUpClass(cls): + + # init vars + tds.init() + sw.sw_init() + + # fmt: off + test_snmptrapd.pytest_empty_data = "{}" + test_snmptrapd.pytest_json_data = ( + '{ "snmptrapd": { ' + ' "version": "2.0.3", ' + ' "title": "ONAP SNMP Trap Receiver" }, ' + ' "protocols": { ' + ' "transport": "udp", ' + ' "ipv4_interface": "0.0.0.0", ' + ' "ipv4_port": 6162, ' + ' "ipv6_interface": "::1", ' + ' "ipv6_port": 6162 }, ' + ' "cache": { ' + ' "dns_cache_ttl_seconds": 60 }, ' + ' "publisher": { ' + ' "http_timeout_milliseconds": 1500, ' + ' "http_retries": 3, ' + ' "http_milliseconds_between_retries": 750, ' + ' "http_primary_publisher": "true", ' + ' "http_peer_publisher": "unavailable", ' + ' "max_traps_between_publishes": 10, ' + ' "max_milliseconds_between_publishes": 10000 }, ' + ' "streams_publishes": { ' + ' "sec_fault_unsecure": { ' + ' "type": "message_router", ' + ' "aaf_password": null, ' + ' "dmaap_info": { ' + ' "location": "mtl5", ' + ' "client_id": null, ' + ' "client_role": null, ' + ' "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' + ' "aaf_username": null } }, ' + ' "files": { ' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' + ' "log_dir": "logs", ' + ' "data_dir": "data", ' + ' "pid_dir": "tmp", ' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' + ' "traps_stats_log": "snmptrapd_stats.csv", ' + ' "perm_status_file": "snmptrapd_status.log", ' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' + ' "eelf_error": "error.log", ' + ' "eelf_debug": "debug.log", ' + ' "eelf_audit": "audit.log", ' + ' "eelf_metrics": "metrics.log", ' + ' "roll_frequency": "day", ' + ' "minimum_severity_to_log": 2 }, ' + ' "trap_config": { ' + ' "sw_interval_in_seconds": 60, ' + ' "notify_oids": { ' + ' ".1.3.6.1.4.1.9.0.1": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.2": { ' + ' "sw_high_water_in_interval": 101, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.3": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.4": { ' + ' "sw_high_water_in_interval": 10, ' + ' "sw_low_water_in_interval": 3, ' + ' "category": "logonly" } } }, ' + ' "snmpv3_config": { ' + ' "usm_users": [ { ' + ' "user": "usr-sha-aes256", ' + ' "engineId": "8000000001020304", ' + ' "usmHMACSHAAuth": "authkey1", ' + ' "usmAesCfb256": "privkey1" }, ' + ' { "user": "user1", ' + ' "engineId": "8000000000000001", ' + ' "usmHMACMD5Auth": "authkey1", ' + ' "usmDESPriv": "privkey1" }, ' + ' { "user": "user2", ' + ' "engineId": "8000000000000002", ' + ' "usmHMACSHAAuth": "authkey2", ' + ' "usmAesCfb128": "privkey2" }, ' + ' { "user": "user3", ' + ' "engineId": "8000000000000003", ' + ' "usmHMACSHAAuth": "authkey3", ' + ' "usmAesCfb256": "privkey3" } ' + ' ] } }' + ) + # fmt: off + + test_snmptrapd.trap_dict_info = { + "uuid": "06f6e91c-3236-11e8-9953-005056865aac", + "agent address": "1.2.3.4", + "agent name": "test-agent.nodomain.com", + "cambria.partition": "test-agent.nodomain.com", + "community": "", + "community len": 0, + "epoch_serno": 15222068260000, + "protocol version": "v2c", + "time received": 1522206826.2938566, + "trap category": "ONAP-COLLECTOR-SNMPTRAP", + "sysUptime": "218567736", + "notify OID": "1.3.6.1.4.1.9999.9.9.999", + "notify OID len": 10, + } + + snmptrap_dir = "/tmp/opt/app/snmptrap" + try: + Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) + except Exception as e: + print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + sys.exit(1) + + # create copy of snmptrapd.json for pytest + test_snmptrapd.pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" + with open(test_snmptrapd.pytest_json_config, "w") as outfile: + outfile.write(test_snmptrapd.pytest_json_data) + + test_snmptrapd.pytest_empty_config = "/tmp/opt/app/snmptrap/etc/empty.json" + with open(test_snmptrapd.pytest_empty_config, "w") as outfile: + outfile.write(test_snmptrapd.pytest_empty_data) - # create copy of snmptrapd.json for pytest - pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" - with open(pytest_json_config, "w") as outfile: - outfile.write(pytest_json_data) def test_usage_err(self): """ Test usage error """ - with pytest.raises(SystemExit) as pytest_wrapped_sys_exit: - result = snmptrapd.usage_err() - assert pytest_wrapped_sys_exit.type == SystemExit - assert pytest_wrapped_sys_exit.value.code == 1 + with self.assertRaises(SystemExit) as exc: + snmptrapd.usage_err() + self.assertEqual(str(exc.exception), "1") + def test_load_all_configs(self): """ Test load of all configs """ + # request load of CBS data + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) - # init vars - tds.init() - sw.sw_init() + result = trapd_get_cbs_config.get_cbs_config() + self.assertEqual(result, True) - # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - self.assertEqual(result, True) + # request load of CBS data + self.assertEqual(snmptrapd.load_all_configs(0, 1), True) + + + def test_resolve_ip(self): + """ Test resolve_ip """ + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + time_base = 1000000 + time_offset = 10000 + with patch('time.time', return_value=time_base): + self.assertEqual(time.time(), time_base) + + fqdn = "foo.example" + ip = "1.2.3.4" + with patch.dict(tds.dns_cache_ip_to_name): + tds.dns_cache_ip_to_name = { } + # DOUBLE EXCEPTION - nothing in tds.dns_cache_ip_expires + # and gethostbyaddr() fails + with patch('socket.gethostbyaddr', side_effect=RuntimeError("gethostbyaddr raises")): + self.assertEqual(snmptrapd.resolve_ip(ip), ip) + self.assertEqual(tds.dns_cache_ip_to_name[ip], ip) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + tds.dns_cache_ip_to_name = { ip: fqdn } + with patch('socket.gethostbyaddr', return_value=(fqdn,fqdn,[ip])): + # EXCEPTION - nothing in tds.dns_cache_ip_expires + del tds.dns_cache_ip_expires[ip] + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() - 10000}): + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) + + + with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() + 10000}): + self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) + self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) + self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + time_offset) - # request load of CBS data - result = snmptrapd.load_all_configs(0, 1) - self.assertEqual(result, True) def test_load_all_configs_signal(self): """ @@ -81,78 +254,101 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - self.assertEqual(result, True) + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + self.assertTrue(trapd_get_cbs_config.get_cbs_config()) + + # request load of CBS data + self.assertTrue(snmptrapd.load_all_configs(1, 1)) + + with patch('snmptrapd.get_cbs_config', return_value=False): + with self.assertRaises(SystemExit): + snmptrapd.load_all_configs(1, 1) - # request load of CBS data - result = snmptrapd.load_all_configs(1, 1) - self.assertEqual(result, True) def test_log_all_arriving_traps(self): """ Test logging of traps """ - # init vars tds.init() - # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - result = trapd_get_cbs_config.get_cbs_config() - - # set last day to current - tds.last_day = datetime.datetime.now().day - - # trap dict for logging - tds.trap_dict = { - "uuid": "06f6e91c-3236-11e8-9953-005056865aac", - "agent address": "1.2.3.4", - "agent name": "test-agent.nodomain.com", - "cambria.partition": "test-agent.nodomain.com", - "community": "", - "community len": 0, - "epoch_serno": 15222068260000, - "protocol version": "v2c", - "time received": 1522206826.2938566, - "trap category": "ONAP-COLLECTOR-SNMPTRAP", - "sysUptime": "218567736", - "notify OID": "1.3.6.1.4.1.9999.9.9.999", - "notify OID len": 10, - } + # don't open files, but try to log - should raise exception + with self.assertRaises(Exception) as exc: + snmptrapd.log_all_arriving_traps() + self.assertIsInstance(exc.exception, TypeError) - # open eelf logs - trapd_io.open_eelf_logs() - - # open trap logs - tds.arriving_traps_filename = ( - tds.c_config["files"]["runtime_base_dir"] - + "/" - + tds.c_config["files"]["log_dir"] - + "/" - + (tds.c_config["files"]["arriving_traps_log"]) - ) - tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename) - - # name and open json trap log - tds.json_traps_filename = ( - tds.c_config["files"]["runtime_base_dir"] - + "/" - + tds.c_config["files"]["log_dir"] - + "/" - + "DMAAP_" - + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) - + ".json" - ) - tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename) - msg = "published traps logged to: %s" % tds.json_traps_filename - trapd_io.stdout_logger(msg) - trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + # request load of CBS data + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + # trap dict for logging + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + result = trapd_get_cbs_config.get_cbs_config() + + # set last day to current + tds.last_day = datetime.datetime.now().day + + + # open eelf logs + trapd_io.open_eelf_logs() + + # open trap logs + tds.arriving_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + (tds.c_config["files"]["arriving_traps_log"]) + ) + tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename) + + # name and open json trap log + tds.json_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + + "/" + + tds.c_config["files"]["log_dir"] + + "/" + + "DMAAP_" + + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) + + ".json" + ) + tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename) + msg = "published traps logged to: %s" % tds.json_traps_filename + trapd_io.stdout_logger(msg) + trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + # also force it to daily roll + snmptrapd.log_all_arriving_traps() + + # try again, but with day rolling + tds.last_day = datetime.datetime.now().day - 1 + snmptrapd.log_all_arriving_traps() + + # try again, with roll_frequency set to minute + tds.last_minute = datetime.datetime.now().minute - 1 + tds.c_config["files"]["roll_frequency"] = "minute" + snmptrapd.log_all_arriving_traps() + + # try again, with roll_frequency set to hour + tds.last_hour = datetime.datetime.now().hour - 1 + tds.c_config["files"]["roll_frequency"] = "hour" + snmptrapd.log_all_arriving_traps() + + # try again, with a bad trap_dict[time_received] + tds.trap_dict["time received"] = "bad_value" + snmptrapd.log_all_arriving_traps() + + # also test log_published_messages() + snmptrapd.log_published_messages("data #1") + + # work even if there is an exception + # this SHOULD be done with a context + sv_json_traps_fd = tds.json_traps_fd + tds.json_traps_fd = test_snmptrapd.WriteThrows() + snmptrapd.log_published_messages("data #2") + tds.json_traps_fd = sv_json_traps_fd - # don't open files, but try to log - should raise exception - with pytest.raises(Exception) as pytest_wrapped_exception: - result = snmptrapd.log_all_arriving_traps() - assert pytest_wrapped_exception.type == AttributeError def test_log_all_incorrect_log_type(self): """ @@ -163,11 +359,14 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - trapd_get_cbs_config.get_cbs_config() + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + trapd_get_cbs_config.get_cbs_config() + + # open eelf logs + trapd_io.open_eelf_logs() - # open eelf logs - trapd_io.open_eelf_logs() def test_v1_trap_receipt(self): """ @@ -178,26 +377,227 @@ class test_snmptrapd(unittest.TestCase): tds.init() # request load of CBS data - os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json") - trapd_get_cbs_config.get_cbs_config() - - errorIndication, errorStatus, errorIndex, varbinds = next( - sendNotification( - SnmpEngine(), - CommunityData("not_public"), - UdpTransportTarget(("localhost", 6162)), - ContextData(), - "trap", - [ - ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), - ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), - ], + with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): + self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) + + trapd_get_cbs_config.get_cbs_config() + + errorIndication, errorStatus, errorIndex, varbinds = next( + sendNotification( + SnmpEngine(), + CommunityData("not_public"), + UdpTransportTarget(("localhost", 6162)), + ContextData(), + "trap", + [ + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), + ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), + ], + ) ) - ) - result = errorIndication - self.assertEqual(result, None) + result = errorIndication + self.assertEqual(result, None) + + + def test_post_dmaap(self): + """ + Test post_dmaap() + """ + + # trap dict for logging + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + with patch('snmptrapd.ecomp_logger') as magic_ecomp_logger: + with patch('requests.Session.post') as magic_session_post: + fake_post_resp = Mock() + magic_session_post.return_value = fake_post_resp + + fake_post_resp.status_code = requests.codes.moved_permanently + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 11) + + fake_post_resp.status_code = requests.codes.ok + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 16) + + magic_ecomp_logger.call_count = 0 + tds.traps_since_last_publish = 1 + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 5) + + magic_ecomp_logger.call_count = 0 + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] = "aaf_username" + tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_password"] = "aaf_password" + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 5) + + # for some reason this exception is being seen as an OSError ???? + magic_ecomp_logger.call_count = 0 + magic_session_post.side_effect = requests.exceptions.RequestException("test throw") + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 10) + + magic_ecomp_logger.call_count = 0 + magic_session_post.side_effect = OSError() + snmptrapd.post_dmaap() + self.assertEqual(magic_ecomp_logger.call_count, 10) + + + @unittest.skip("do not know what to pass in for vars. Need an object with a clone() method") + def test_comm_string_rewrite_observer(self): + """ + test comm_string_rewrite_observer() + """ + vars = { "communityName": ["name"] } + snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx") + assertEqual(vars["communitName"], "public") + + vars = { "communityName": [] } + snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx") + assertEqual(vars["communitName"], "") + + + def test_snmp_engine_observer_cb(self): + """ + test snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): + """ + + snmp_engine = "snmp_engine" + execpoint = "execpoint" + cbCtx = "cbCtx" + variables = { + 'transportDomain': [ 1, 2, 3 ], + 'transportAddress': [ "a", "b", "c" ] + } + for secmodel,ret in [ (1, "v1"), (2, "v2c"), (3, "v3"), (4, "unknown") ]: + variables["securityModel"] = secmodel + snmptrapd.snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx) + self.assertEqual(tds.trap_dict["protocol version"], ret) -if __name__ == "__main__": + def test_add_varbind_to_log_string(self): + """ + test add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val) + """ + vb_oid = "vb_oid" + vb_type = "vb_type" + + class TempPP(): + def prettyPrint(self): + return "pp ret" + + vb_val = TempPP() + + self.assertEqual(tds.all_vb_str, "") + + snmptrapd.add_varbind_to_log_string(0, vb_oid, vb_type, vb_val) + self.assertEqual(tds.all_vb_str, + 'varbinds: [0] vb_oid {vb_type} pp ret') + + snmptrapd.add_varbind_to_log_string(1, vb_oid, vb_type, vb_val) + self.assertEqual(tds.all_vb_str, + 'varbinds: [0] vb_oid {vb_type} pp ret [1] vb_oid {vb_type} pp ret') + + + def test_add_varbind_to_json(self): + """ + test add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val) + """ + + class TempPP(): + def __init__(self, ret): + self.ret = ret + def prettyPrint(self): + return self.ret + + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + vb_oid = TempPP("1.2.3") + override_vb_oid = TempPP("1.3.6.1.6.3.18.1.3.0") + + vb_type = "vb_type" + + vb_val = TempPP("foo.example") + + self.assertEqual(snmptrapd.add_varbind_to_json(0, vb_oid, vb_type, vb_val), 0) + self.assertEqual(snmptrapd.add_varbind_to_json(1, vb_oid, vb_type, vb_val), 0) + self.assertEqual(tds.trap_dict["notify OID"], ".foo.example") + self.assertEqual(tds.trap_dict["notify OID len"], 2) + + with patch('snmptrapd.resolve_ip') as magic_resolve_ip: + magic_resolve_ip.return_value = 'foo.example' + self.assertEqual(snmptrapd.add_varbind_to_json(2, override_vb_oid, vb_type, vb_val), 0) + self.assertEqual(tds.trap_dict["agent address"], "foo.example") + self.assertEqual(tds.trap_dict["agent name"], "foo.example") + + sv_protocol_version = tds.trap_dict["protocol version"] + tds.trap_dict["protocol version"] = "v1" + self.assertEqual(snmptrapd.add_varbind_to_json(4, vb_oid, vb_type, vb_val), 0) + self.assertEqual(snmptrapd.add_varbind_to_json(5, vb_oid, vb_type, vb_val), 1) + + tds.trap_dict["protocol version"] = sv_protocol_version + self.assertEqual(snmptrapd.add_varbind_to_json(6, vb_oid, vb_type, vb_val), 1) + self.assertEqual(tds.all_vb_json_str, + ', "varbinds": [{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"}') + + self.assertEqual(snmptrapd.add_varbind_to_json(7, vb_oid, vb_type, vb_val), 1) + self.assertEqual(tds.all_vb_json_str, + ', "varbinds": [{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"} ,{"varbind_oid": ".1.2.3", ' + '"varbind_type": "octet", "varbind_value": ' + '"foo.example"}') + + + @patch('snmptrapd.log_all_arriving_traps') + @patch('snmptrapd.post_dmaap', return_value = 0) + @patch('snmptrapd.ecomp_logger', return_value = 0) + @patch('snmptrapd.add_varbind_to_json', return_value = 1) + @patch('snmptrapd.add_varbind_to_log_string', return_value = 0) + def test_notif_receiver_cb(self, magic_add_varbind_to_log_string, magic_add_varbind_to_json, + magic_ecomp_logger, magic_port_dmaap, magic_lost_all_arriving_traps): + """ notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, varBinds, cbCtx) """ + with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): + with patch('trapd_stormwatch.sw_storm_active', return_value=True): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertFalse(tds.first_trap) + self.assertEqual(magic_ecomp_logger.call_count, 8) + + magic_ecomp_logger.call_count = 0 + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.c_config["publisher"]["max_traps_between_publishes"] = 1 + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.c_config["publisher"]["max_traps_between_publishes"] = 100 + tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 0 + tds.last_pub_time = 0 + with patch('time.time', return_value=0): + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + magic_ecomp_logger.call_count = 0 + tds.last_pub_time = 100000 + tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 1 + with patch('time.time', return_value=10): + with patch('trapd_stormwatch.sw_storm_active', return_value=False): + snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") + self.assertEqual(magic_ecomp_logger.call_count, 4) + + +if __name__ == "__main__": # pragma: no cover unittest.main() |