aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_snmptrapd.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_snmptrapd.py')
-rw-r--r--tests/test_snmptrapd.py610
1 files changed, 505 insertions, 105 deletions
diff --git a/tests/test_snmptrapd.py b/tests/test_snmptrapd.py
index dee2aa0..736e114 100644
--- a/tests/test_snmptrapd.py
+++ b/tests/test_snmptrapd.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,11 +14,17 @@
# limitations under the License.
# ============LICENSE_END=========================================================
+import copy
+import datetime
import os
-import pytest
import unittest
+from pathlib import Path
+import time
+from unittest.mock import patch, Mock
+
+import requests
+
import snmptrapd
-import datetime
import trapd_settings as tds
import trapd_stormwatch_settings as sws
@@ -37,40 +43,207 @@ class test_snmptrapd(unittest.TestCase):
Test the save_pid mod
"""
- pytest_json_data = '{ "snmptrapd": { "version": "2.0.3", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }'
+ class WriteThrows():
+ def write():
+ raise RuntimeError("write() throws")
+
+
+ @classmethod
+ def setUpClass(cls):
+
+ # init vars
+ tds.init()
+ sw.sw_init()
+
+ # fmt: off
+ test_snmptrapd.pytest_empty_data = "{}"
+ test_snmptrapd.pytest_json_data = (
+ '{ "snmptrapd": { '
+ ' "version": "2.0.3", '
+ ' "title": "ONAP SNMP Trap Receiver" }, '
+ ' "protocols": { '
+ ' "transport": "udp", '
+ ' "ipv4_interface": "0.0.0.0", '
+ ' "ipv4_port": 6162, '
+ ' "ipv6_interface": "::1", '
+ ' "ipv6_port": 6162 }, '
+ ' "cache": { '
+ ' "dns_cache_ttl_seconds": 60 }, '
+ ' "publisher": { '
+ ' "http_timeout_milliseconds": 1500, '
+ ' "http_retries": 3, '
+ ' "http_milliseconds_between_retries": 750, '
+ ' "http_primary_publisher": "true", '
+ ' "http_peer_publisher": "unavailable", '
+ ' "max_traps_between_publishes": 10, '
+ ' "max_milliseconds_between_publishes": 10000 }, '
+ ' "streams_publishes": { '
+ ' "sec_fault_unsecure": { '
+ ' "type": "message_router", '
+ ' "aaf_password": null, '
+ ' "dmaap_info": { '
+ ' "location": "mtl5", '
+ ' "client_id": null, '
+ ' "client_role": null, '
+ ' "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, '
+ ' "aaf_username": null } }, '
+ ' "files": { '
+ ' "runtime_base_dir": "/tmp/opt/app/snmptrap", '
+ ' "log_dir": "logs", '
+ ' "data_dir": "data", '
+ ' "pid_dir": "tmp", '
+ ' "arriving_traps_log": "snmptrapd_arriving_traps.log", '
+ ' "snmptrapd_diag": "snmptrapd_prog_diag.log", '
+ ' "traps_stats_log": "snmptrapd_stats.csv", '
+ ' "perm_status_file": "snmptrapd_status.log", '
+ ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", '
+ ' "eelf_error": "error.log", '
+ ' "eelf_debug": "debug.log", '
+ ' "eelf_audit": "audit.log", '
+ ' "eelf_metrics": "metrics.log", '
+ ' "roll_frequency": "day", '
+ ' "minimum_severity_to_log": 2 }, '
+ ' "trap_config": { '
+ ' "sw_interval_in_seconds": 60, '
+ ' "notify_oids": { '
+ ' ".1.3.6.1.4.1.9.0.1": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.2": { '
+ ' "sw_high_water_in_interval": 101, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.3": { '
+ ' "sw_high_water_in_interval": 102, '
+ ' "sw_low_water_in_interval": 7, '
+ ' "category": "logonly" }, '
+ ' ".1.3.6.1.4.1.9.0.4": { '
+ ' "sw_high_water_in_interval": 10, '
+ ' "sw_low_water_in_interval": 3, '
+ ' "category": "logonly" } } }, '
+ ' "snmpv3_config": { '
+ ' "usm_users": [ { '
+ ' "user": "usr-sha-aes256", '
+ ' "engineId": "8000000001020304", '
+ ' "usmHMACSHAAuth": "authkey1", '
+ ' "usmAesCfb256": "privkey1" }, '
+ ' { "user": "user1", '
+ ' "engineId": "8000000000000001", '
+ ' "usmHMACMD5Auth": "authkey1", '
+ ' "usmDESPriv": "privkey1" }, '
+ ' { "user": "user2", '
+ ' "engineId": "8000000000000002", '
+ ' "usmHMACSHAAuth": "authkey2", '
+ ' "usmAesCfb128": "privkey2" }, '
+ ' { "user": "user3", '
+ ' "engineId": "8000000000000003", '
+ ' "usmHMACSHAAuth": "authkey3", '
+ ' "usmAesCfb256": "privkey3" } '
+ ' ] } }'
+ )
+ # fmt: off
+
+ test_snmptrapd.trap_dict_info = {
+ "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
+ "agent address": "1.2.3.4",
+ "agent name": "test-agent.nodomain.com",
+ "cambria.partition": "test-agent.nodomain.com",
+ "community": "",
+ "community len": 0,
+ "epoch_serno": 15222068260000,
+ "protocol version": "v2c",
+ "time received": 1522206826.2938566,
+ "trap category": "ONAP-COLLECTOR-SNMPTRAP",
+ "sysUptime": "218567736",
+ "notify OID": "1.3.6.1.4.1.9999.9.9.999",
+ "notify OID len": 10,
+ }
+
+ snmptrap_dir = "/tmp/opt/app/snmptrap"
+ try:
+ Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True)
+ Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True)
+ except Exception as e:
+ print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
+ sys.exit(1)
+
+ # create copy of snmptrapd.json for pytest
+ test_snmptrapd.pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
+ with open(test_snmptrapd.pytest_json_config, "w") as outfile:
+ outfile.write(test_snmptrapd.pytest_json_data)
+
+ test_snmptrapd.pytest_empty_config = "/tmp/opt/app/snmptrap/etc/empty.json"
+ with open(test_snmptrapd.pytest_empty_config, "w") as outfile:
+ outfile.write(test_snmptrapd.pytest_empty_data)
- # create copy of snmptrapd.json for pytest
- pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
- with open(pytest_json_config, "w") as outfile:
- outfile.write(pytest_json_data)
def test_usage_err(self):
"""
Test usage error
"""
- with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
- result = snmptrapd.usage_err()
- assert pytest_wrapped_sys_exit.type == SystemExit
- assert pytest_wrapped_sys_exit.value.code == 1
+ with self.assertRaises(SystemExit) as exc:
+ snmptrapd.usage_err()
+ self.assertEqual(str(exc.exception), "1")
+
def test_load_all_configs(self):
"""
Test load of all configs
"""
+ # request load of CBS data
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
- # init vars
- tds.init()
- sw.sw_init()
+ result = trapd_get_cbs_config.get_cbs_config()
+ self.assertEqual(result, True)
- # request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
- self.assertEqual(result, True)
+ # request load of CBS data
+ self.assertEqual(snmptrapd.load_all_configs(0, 1), True)
+
+
+ def test_resolve_ip(self):
+ """ Test resolve_ip """
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ time_base = 1000000
+ time_offset = 10000
+ with patch('time.time', return_value=time_base):
+ self.assertEqual(time.time(), time_base)
+
+ fqdn = "foo.example"
+ ip = "1.2.3.4"
+ with patch.dict(tds.dns_cache_ip_to_name):
+ tds.dns_cache_ip_to_name = { }
+ # DOUBLE EXCEPTION - nothing in tds.dns_cache_ip_expires
+ # and gethostbyaddr() fails
+ with patch('socket.gethostbyaddr', side_effect=RuntimeError("gethostbyaddr raises")):
+ self.assertEqual(snmptrapd.resolve_ip(ip), ip)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], ip)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+ tds.dns_cache_ip_to_name = { ip: fqdn }
+ with patch('socket.gethostbyaddr', return_value=(fqdn,fqdn,[ip])):
+ # EXCEPTION - nothing in tds.dns_cache_ip_expires
+ del tds.dns_cache_ip_expires[ip]
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+ with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() - 10000}):
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60)
+
+
+ with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() + 10000}):
+ self.assertEqual(snmptrapd.resolve_ip(ip), fqdn)
+ self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn)
+ self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + time_offset)
- # request load of CBS data
- result = snmptrapd.load_all_configs(0, 1)
- self.assertEqual(result, True)
def test_load_all_configs_signal(self):
"""
@@ -81,78 +254,101 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
- self.assertEqual(result, True)
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ self.assertTrue(trapd_get_cbs_config.get_cbs_config())
+
+ # request load of CBS data
+ self.assertTrue(snmptrapd.load_all_configs(1, 1))
+
+ with patch('snmptrapd.get_cbs_config', return_value=False):
+ with self.assertRaises(SystemExit):
+ snmptrapd.load_all_configs(1, 1)
- # request load of CBS data
- result = snmptrapd.load_all_configs(1, 1)
- self.assertEqual(result, True)
def test_log_all_arriving_traps(self):
"""
Test logging of traps
"""
-
# init vars
tds.init()
- # request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- result = trapd_get_cbs_config.get_cbs_config()
-
- # set last day to current
- tds.last_day = datetime.datetime.now().day
-
- # trap dict for logging
- tds.trap_dict = {
- "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
- "agent address": "1.2.3.4",
- "agent name": "test-agent.nodomain.com",
- "cambria.partition": "test-agent.nodomain.com",
- "community": "",
- "community len": 0,
- "epoch_serno": 15222068260000,
- "protocol version": "v2c",
- "time received": 1522206826.2938566,
- "trap category": "ONAP-COLLECTOR-SNMPTRAP",
- "sysUptime": "218567736",
- "notify OID": "1.3.6.1.4.1.9999.9.9.999",
- "notify OID len": 10,
- }
+ # don't open files, but try to log - should raise exception
+ with self.assertRaises(Exception) as exc:
+ snmptrapd.log_all_arriving_traps()
+ self.assertIsInstance(exc.exception, TypeError)
- # open eelf logs
- trapd_io.open_eelf_logs()
-
- # open trap logs
- tds.arriving_traps_filename = (
- tds.c_config["files"]["runtime_base_dir"]
- + "/"
- + tds.c_config["files"]["log_dir"]
- + "/"
- + (tds.c_config["files"]["arriving_traps_log"])
- )
- tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename)
-
- # name and open json trap log
- tds.json_traps_filename = (
- tds.c_config["files"]["runtime_base_dir"]
- + "/"
- + tds.c_config["files"]["log_dir"]
- + "/"
- + "DMAAP_"
- + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1])
- + ".json"
- )
- tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename)
- msg = "published traps logged to: %s" % tds.json_traps_filename
- trapd_io.stdout_logger(msg)
- trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+ # request load of CBS data
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ # trap dict for logging
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ result = trapd_get_cbs_config.get_cbs_config()
+
+ # set last day to current
+ tds.last_day = datetime.datetime.now().day
+
+
+ # open eelf logs
+ trapd_io.open_eelf_logs()
+
+ # open trap logs
+ tds.arriving_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"]
+ + "/"
+ + tds.c_config["files"]["log_dir"]
+ + "/"
+ + (tds.c_config["files"]["arriving_traps_log"])
+ )
+ tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename)
+
+ # name and open json trap log
+ tds.json_traps_filename = (
+ tds.c_config["files"]["runtime_base_dir"]
+ + "/"
+ + tds.c_config["files"]["log_dir"]
+ + "/"
+ + "DMAAP_"
+ + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1])
+ + ".json"
+ )
+ tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename)
+ msg = "published traps logged to: %s" % tds.json_traps_filename
+ trapd_io.stdout_logger(msg)
+ trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+ # also force it to daily roll
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, but with day rolling
+ tds.last_day = datetime.datetime.now().day - 1
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with roll_frequency set to minute
+ tds.last_minute = datetime.datetime.now().minute - 1
+ tds.c_config["files"]["roll_frequency"] = "minute"
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with roll_frequency set to hour
+ tds.last_hour = datetime.datetime.now().hour - 1
+ tds.c_config["files"]["roll_frequency"] = "hour"
+ snmptrapd.log_all_arriving_traps()
+
+ # try again, with a bad trap_dict[time_received]
+ tds.trap_dict["time received"] = "bad_value"
+ snmptrapd.log_all_arriving_traps()
+
+ # also test log_published_messages()
+ snmptrapd.log_published_messages("data #1")
+
+ # work even if there is an exception
+ # this SHOULD be done with a context
+ sv_json_traps_fd = tds.json_traps_fd
+ tds.json_traps_fd = test_snmptrapd.WriteThrows()
+ snmptrapd.log_published_messages("data #2")
+ tds.json_traps_fd = sv_json_traps_fd
- # don't open files, but try to log - should raise exception
- with pytest.raises(Exception) as pytest_wrapped_exception:
- result = snmptrapd.log_all_arriving_traps()
- assert pytest_wrapped_exception.type == AttributeError
def test_log_all_incorrect_log_type(self):
"""
@@ -163,11 +359,14 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- trapd_get_cbs_config.get_cbs_config()
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ trapd_get_cbs_config.get_cbs_config()
+
+ # open eelf logs
+ trapd_io.open_eelf_logs()
- # open eelf logs
- trapd_io.open_eelf_logs()
def test_v1_trap_receipt(self):
"""
@@ -178,26 +377,227 @@ class test_snmptrapd(unittest.TestCase):
tds.init()
# request load of CBS data
- os.environ.update(CBS_SIM_JSON="/tmp/opt/app/snmptrap/etc/snmptrapd.json")
- trapd_get_cbs_config.get_cbs_config()
-
- errorIndication, errorStatus, errorIndex, varbinds = next(
- sendNotification(
- SnmpEngine(),
- CommunityData("not_public"),
- UdpTransportTarget(("localhost", 6162)),
- ContextData(),
- "trap",
- [
- ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")),
- ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")),
- ],
+ with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}):
+ self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config)
+
+ trapd_get_cbs_config.get_cbs_config()
+
+ errorIndication, errorStatus, errorIndex, varbinds = next(
+ sendNotification(
+ SnmpEngine(),
+ CommunityData("not_public"),
+ UdpTransportTarget(("localhost", 6162)),
+ ContextData(),
+ "trap",
+ [
+ ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")),
+ ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")),
+ ],
+ )
)
- )
- result = errorIndication
- self.assertEqual(result, None)
+ result = errorIndication
+ self.assertEqual(result, None)
+
+
+ def test_post_dmaap(self):
+ """
+ Test post_dmaap()
+ """
+
+ # trap dict for logging
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ with patch('snmptrapd.ecomp_logger') as magic_ecomp_logger:
+ with patch('requests.Session.post') as magic_session_post:
+ fake_post_resp = Mock()
+ magic_session_post.return_value = fake_post_resp
+
+ fake_post_resp.status_code = requests.codes.moved_permanently
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 11)
+
+ fake_post_resp.status_code = requests.codes.ok
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 16)
+
+ magic_ecomp_logger.call_count = 0
+ tds.traps_since_last_publish = 1
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 5)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] = "aaf_username"
+ tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_password"] = "aaf_password"
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 5)
+
+ # for some reason this exception is being seen as an OSError ????
+ magic_ecomp_logger.call_count = 0
+ magic_session_post.side_effect = requests.exceptions.RequestException("test throw")
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 10)
+
+ magic_ecomp_logger.call_count = 0
+ magic_session_post.side_effect = OSError()
+ snmptrapd.post_dmaap()
+ self.assertEqual(magic_ecomp_logger.call_count, 10)
+
+
+ @unittest.skip("do not know what to pass in for vars. Need an object with a clone() method")
+ def test_comm_string_rewrite_observer(self):
+ """
+ test comm_string_rewrite_observer()
+ """
+ vars = { "communityName": ["name"] }
+ snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx")
+ assertEqual(vars["communitName"], "public")
+
+ vars = { "communityName": [] }
+ snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx")
+ assertEqual(vars["communitName"], "")
+
+
+ def test_snmp_engine_observer_cb(self):
+ """
+ test snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx):
+ """
+
+ snmp_engine = "snmp_engine"
+ execpoint = "execpoint"
+ cbCtx = "cbCtx"
+ variables = {
+ 'transportDomain': [ 1, 2, 3 ],
+ 'transportAddress': [ "a", "b", "c" ]
+ }
+ for secmodel,ret in [ (1, "v1"), (2, "v2c"), (3, "v3"), (4, "unknown") ]:
+ variables["securityModel"] = secmodel
+ snmptrapd.snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx)
+ self.assertEqual(tds.trap_dict["protocol version"], ret)
-if __name__ == "__main__":
+ def test_add_varbind_to_log_string(self):
+ """
+ test add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val)
+ """
+ vb_oid = "vb_oid"
+ vb_type = "vb_type"
+
+ class TempPP():
+ def prettyPrint(self):
+ return "pp ret"
+
+ vb_val = TempPP()
+
+ self.assertEqual(tds.all_vb_str, "")
+
+ snmptrapd.add_varbind_to_log_string(0, vb_oid, vb_type, vb_val)
+ self.assertEqual(tds.all_vb_str,
+ 'varbinds: [0] vb_oid {vb_type} pp ret')
+
+ snmptrapd.add_varbind_to_log_string(1, vb_oid, vb_type, vb_val)
+ self.assertEqual(tds.all_vb_str,
+ 'varbinds: [0] vb_oid {vb_type} pp ret [1] vb_oid {vb_type} pp ret')
+
+
+ def test_add_varbind_to_json(self):
+ """
+ test add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val)
+ """
+
+ class TempPP():
+ def __init__(self, ret):
+ self.ret = ret
+ def prettyPrint(self):
+ return self.ret
+
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ vb_oid = TempPP("1.2.3")
+ override_vb_oid = TempPP("1.3.6.1.6.3.18.1.3.0")
+
+ vb_type = "vb_type"
+
+ vb_val = TempPP("foo.example")
+
+ self.assertEqual(snmptrapd.add_varbind_to_json(0, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(snmptrapd.add_varbind_to_json(1, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(tds.trap_dict["notify OID"], ".foo.example")
+ self.assertEqual(tds.trap_dict["notify OID len"], 2)
+
+ with patch('snmptrapd.resolve_ip') as magic_resolve_ip:
+ magic_resolve_ip.return_value = 'foo.example'
+ self.assertEqual(snmptrapd.add_varbind_to_json(2, override_vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(tds.trap_dict["agent address"], "foo.example")
+ self.assertEqual(tds.trap_dict["agent name"], "foo.example")
+
+ sv_protocol_version = tds.trap_dict["protocol version"]
+ tds.trap_dict["protocol version"] = "v1"
+ self.assertEqual(snmptrapd.add_varbind_to_json(4, vb_oid, vb_type, vb_val), 0)
+ self.assertEqual(snmptrapd.add_varbind_to_json(5, vb_oid, vb_type, vb_val), 1)
+
+ tds.trap_dict["protocol version"] = sv_protocol_version
+ self.assertEqual(snmptrapd.add_varbind_to_json(6, vb_oid, vb_type, vb_val), 1)
+ self.assertEqual(tds.all_vb_json_str,
+ ', "varbinds": [{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"}')
+
+ self.assertEqual(snmptrapd.add_varbind_to_json(7, vb_oid, vb_type, vb_val), 1)
+ self.assertEqual(tds.all_vb_json_str,
+ ', "varbinds": [{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"} ,{"varbind_oid": ".1.2.3", '
+ '"varbind_type": "octet", "varbind_value": '
+ '"foo.example"}')
+
+
+ @patch('snmptrapd.log_all_arriving_traps')
+ @patch('snmptrapd.post_dmaap', return_value = 0)
+ @patch('snmptrapd.ecomp_logger', return_value = 0)
+ @patch('snmptrapd.add_varbind_to_json', return_value = 1)
+ @patch('snmptrapd.add_varbind_to_log_string', return_value = 0)
+ def test_notif_receiver_cb(self, magic_add_varbind_to_log_string, magic_add_varbind_to_json,
+ magic_ecomp_logger, magic_port_dmaap, magic_lost_all_arriving_traps):
+ """ notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, varBinds, cbCtx) """
+ with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=True):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertFalse(tds.first_trap)
+ self.assertEqual(magic_ecomp_logger.call_count, 8)
+
+ magic_ecomp_logger.call_count = 0
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["publisher"]["max_traps_between_publishes"] = 1
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.c_config["publisher"]["max_traps_between_publishes"] = 100
+ tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 0
+ tds.last_pub_time = 0
+ with patch('time.time', return_value=0):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+ magic_ecomp_logger.call_count = 0
+ tds.last_pub_time = 100000
+ tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 1
+ with patch('time.time', return_value=10):
+ with patch('trapd_stormwatch.sw_storm_active', return_value=False):
+ snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx")
+ self.assertEqual(magic_ecomp_logger.call_count, 4)
+
+
+if __name__ == "__main__": # pragma: no cover
unittest.main()