diff options
Diffstat (limited to 'tests/test_trapd_io.py')
-rw-r--r-- | tests/test_trapd_io.py | 521 |
1 files changed, 446 insertions, 75 deletions
diff --git a/tests/test_trapd_io.py b/tests/test_trapd_io.py index c1702aa..f3d1a4c 100644 --- a/tests/test_trapd_io.py +++ b/tests/test_trapd_io.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2019-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,17 +14,21 @@ # limitations under the License. # ============LICENSE_END========================================================= +import datetime +import glob +import io +import json import os -import pytest +from pathlib import Path +import sys +import tempfile import unittest +from unittest.mock import patch + import snmptrapd -import datetime -import json import trapd_settings as tds import trapd_runtime_pid import trapd_io -import sys -from pathlib import Path class test_trapd_io(unittest.TestCase): @@ -32,32 +36,141 @@ class test_trapd_io(unittest.TestCase): Test the save_pid mod """ - tds.c_config = json.loads( - '{ "snmptrapd": { "version": "1.4.0", "title": "ONAP SNMP Trap Receiver" }, "protocols": { "transport": "udp", "ipv4_interface": "0.0.0.0", "ipv4_port": 6162, "ipv6_interface": "::1", "ipv6_port": 6162 }, "cache": { "dns_cache_ttl_seconds": 60 }, "publisher": { "http_timeout_milliseconds": 1500, "http_retries": 3, "http_milliseconds_between_retries": 750, "http_primary_publisher": "true", "http_peer_publisher": "unavailable", "max_traps_between_publishes": 10, "max_milliseconds_between_publishes": 10000 }, "streams_publishes": { "sec_fault_unsecure": { "type": "message_router", "aaf_password": null, "dmaap_info": { "location": "mtl5", "client_id": null, "client_role": null, "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, "aaf_username": null } }, "files": { "runtime_base_dir": "/tmp/opt/app/snmptrap", "log_dir": "logs", "data_dir": "data", "pid_dir": "tmp", "arriving_traps_log": "snmptrapd_arriving_traps.log", "snmptrapd_diag": "snmptrapd_prog_diag.log", "traps_stats_log": "snmptrapd_stats.csv", "perm_status_file": "snmptrapd_status.log", "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", "eelf_error": "error.log", "eelf_debug": "debug.log", "eelf_audit": "audit.log", "eelf_metrics": "metrics.log", "roll_frequency": "day", "minimum_severity_to_log": 2 }, "trap_config": { "sw_interval_in_seconds": 60, "notify_oids": { ".1.3.6.1.4.1.9.0.1": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.2": { "sw_high_water_in_interval": 101, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.3": { "sw_high_water_in_interval": 102, "sw_low_water_in_interval": 7, "category": "logonly" }, ".1.3.6.1.4.1.9.0.4": { "sw_high_water_in_interval": 10, "sw_low_water_in_interval": 3, "category": "logonly" } } }, "snmpv3_config": { "usm_users": [ { "user": "usr-sha-aes256", "engineId": "8000000001020304", "usmHMACSHAAuth": "authkey1", "usmAesCfb256": "privkey1" }, { "user": "user1", "engineId": "8000000000000001", "usmHMACMD5Auth": "authkey1", "usmDESPriv": "privkey1" }, { "user": "user2", "engineId": "8000000000000002", "usmHMACSHAAuth": "authkey2", "usmAesCfb128": "privkey2" }, { "user": "user3", "engineId": "8000000000000003", "usmHMACSHAAuth": "authkey3", "usmAesCfb256": "privkey3" } ] } }' - ) + class PseudoFile(): + """ test file-like object that does nothing """ + def write(self): + pass + def close(self): + pass - def test_roll_all_files_notopen(self): - """ - Test rolling files that aren't open - """ + class WriteThrows(): + """ test file-like object that throws on a write """ + def write(self): + raise RuntimeError("close() throws") + + + @classmethod + def setUpClass(cls): + + tds.init() + + snmptrap_dir = "/tmp/opt/app/snmptrap" + try: + Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) + Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) + except Exception as e: + print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + sys.exit(1) + + # fmt: off + tds.c_config = json.loads( + '{ "snmptrapd": { ' + ' "version": "1.4.0", ' + ' "title": "ONAP SNMP Trap Receiver" }, ' + '"protocols": { ' + ' "transport": "udp", ' + ' "ipv4_interface": "0.0.0.0", ' + ' "ipv4_port": 6162, ' + ' "ipv6_interface": "::1", ' + ' "ipv6_port": 6162 }, ' + '"cache": { ' + ' "dns_cache_ttl_seconds": 60 }, ' + '"publisher": { ' + ' "http_timeout_milliseconds": 1500, ' + ' "http_retries": 3, ' + ' "http_milliseconds_between_retries": 750, ' + ' "http_primary_publisher": "true", ' + ' "http_peer_publisher": "unavailable", ' + ' "max_traps_between_publishes": 10, ' + ' "max_milliseconds_between_publishes": 10000 }, ' + '"streams_publishes": { ' + ' "sec_fault_unsecure": { ' + ' "type": "message_router", ' + ' "aaf_password": null, ' + ' "dmaap_info": { ' + ' "location": "mtl5", ' + ' "client_id": null, ' + ' "client_role": null, ' + ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' + ' "aaf_username": null } }, ' + '"files": { ' + ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' + ' "log_dir": "logs", ' + ' "data_dir": "data", ' + ' "pid_dir": "tmp", ' + ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' + ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' + ' "traps_stats_log": "snmptrapd_stats.csv", ' + ' "perm_status_file": "snmptrapd_status.log", ' + ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' + ' "eelf_error": "error.log", ' + ' "eelf_debug": "debug.log", ' + ' "eelf_audit": "audit.log", ' + ' "eelf_metrics": "metrics.log", ' + ' "roll_frequency": "day", ' + ' "minimum_severity_to_log": 2 }, ' + '"trap_config": { ' + ' "sw_interval_in_seconds": 60, ' + ' "notify_oids": { ' + ' ".1.3.6.1.4.1.9.0.1": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.2": { ' + ' "sw_high_water_in_interval": 101, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.3": { ' + ' "sw_high_water_in_interval": 102, ' + ' "sw_low_water_in_interval": 7, ' + ' "category": "logonly" }, ' + ' ".1.3.6.1.4.1.9.0.4": { ' + ' "sw_high_water_in_interval": 10, ' + ' "sw_low_water_in_interval": 3, ' + ' "category": "logonly" } } }, ' + '"snmpv3_config": { ' + ' "usm_users": [ { ' + ' "user": "usr-sha-aes256", ' + ' "engineId": "8000000001020304", ' + ' "usmHMACSHAAuth": "authkey1", ' + ' "usmAesCfb256": "privkey1" }, ' + ' { "user": "user1", ' + ' "engineId": "8000000000000001", ' + ' "usmHMACMD5Auth": "authkey1", ' + ' "usmDESPriv": "privkey1" }, ' + ' { "user": "user2", ' + ' "engineId": "8000000000000002", ' + ' "usmHMACSHAAuth": "authkey2", ' + ' "usmAesCfb128": "privkey2" }, ' + ' { "user": "user3", ' + ' "engineId": "8000000000000003", ' + ' "usmHMACSHAAuth": "authkey3", ' + ' "usmAesCfb256": "privkey3" } ' + '] } }' + ) + # fmt: on + tds.json_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + "/json_traps.json" + ) + tds.arriving_traps_filename = ( + tds.c_config["files"]["runtime_base_dir"] + "/arriving_traps.log" + ) - # try to roll logs when not open - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.roll_all_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_error_file(self): """ Test bad error file location """ - # open eelf error logs - tds.c_config["files.eelf_error"] = "/bad_dir/error.log" + with patch.dict(tds.c_config["files"]): + # open eelf error logs + tds.c_config["files"]["eelf_error"] = "/bad_dir/error.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_debug_file(self): """ @@ -65,86 +178,248 @@ class test_trapd_io(unittest.TestCase): """ # open eelf debug logs - tds.c_config["files.eelf_debug"] = "/bad_dir/debug.log" + with patch.dict(tds.c_config["files"]): + tds.c_config["files"]["eelf_debug"] = "/bad_dir/debug.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_audit_file(self): """ Test bad audit file location """ - # open eelf debug logs - tds.c_config["files.eelf_audit"] = "/bad_dir/audit.log" + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + tds.c_config["files"]["eelf_audit"] = "/bad_dir/audit.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit def test_open_eelf_metrics_file(self): """ Test bad metrics file location """ + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + tds.c_config["files"]["eelf_metrics"] = "/bad_dir/metrics.log" + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_error_file_missing_name(self): + """ + Test bad error file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf error logs + del tds.c_config["files"]["eelf_error"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_debug_file_missing_name(self): + """ + Test bad debug file location + """ + # open eelf debug logs - tds.c_config["files.eelf_metrics"] = "/bad_dir/metrics.log" + with patch.dict(tds.c_config["files"]): + del tds.c_config["files"]["eelf_debug"] - # try to open file in non-existent dir - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_eelf_logs() - assert pytest_wrapped_exception.type == SystemExit + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() - def test_roll_all_logs(self): + + def test_open_eelf_audit_file_missing_name(self): + """ + Test bad audit file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + del tds.c_config["files"]["eelf_audit"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_open_eelf_metrics_file_missing_name(self): + """ + Test bad metrics file location + """ + + with patch.dict(tds.c_config["files"]): + # open eelf debug logs + del tds.c_config["files"]["eelf_metrics"] + + # try to open file in non-existent dir + with self.assertRaises(SystemExit): + result = trapd_io.open_eelf_logs() + + + def test_roll_all_logs_not_open(self): """ Test roll of logs when not open """ - # try to roll logs when not open - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.roll_all_logs() - assert pytest_wrapped_exception.type == SystemExit + # try to roll logs when not open. Shouldn't fail + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + def test_roll_all_logs(self): + """ + Test rolling files that they are open + """ + + trapd_io.open_eelf_logs() + # try to roll logs + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_roll_file_throws(self): + """ + Test rolling files that they are open + but roll_file throws an exception + """ + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.roll_file') as roll_file_throws: + roll_file_throws.side_effect = RuntimeError("roll_file() throws") + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_eelf_logs_returns_false(self): + """ + Test rolling files that they are open + but open_eelf_logs returns false + """ + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_eelf_logs') as open_eelf_logs_throws: + open_eelf_logs_throws.return_value = False + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_file_json_traps_throws(self): + """ + Test rolling files that they are open + but open_file(json_traps_filename) throws an exception + """ + + def tmp_func(nm): + if nm == tds.json_traps_filename: + raise RuntimeError("json_traps_filename throws") + return test_trapd_io.PseudoFile() + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_file') as open_file_throws: + open_file_throws.side_effect = tmp_func + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + + + def test_roll_all_logs_open_file_arriving_traps_throws(self): + """ + Test rolling files that they are open + but open_file(arriving_traps_filename) throws an exception + """ + + def tmp_func(nm): + if nm == tds.arriving_traps_filename: + raise RuntimeError("arriving_traps_filename throws") + return test_trapd_io.PseudoFile() + + trapd_io.open_eelf_logs() + # try to roll logs + with patch('trapd_io.open_file') as open_file_throws: + open_file_throws.side_effect = tmp_func + with self.assertRaises(SystemExit): + trapd_io.roll_all_logs() + self.assertIsNotNone(tds.eelf_error_fd) + def test_roll_file(self): """ Test roll of individual file when not present """ + # try to roll a valid log file + with tempfile.TemporaryDirectory() as ntd: + fn = ntd + "/test.log" + with open(fn, "w") as ofp: + self.assertTrue(trapd_io.roll_file(fn)) + # The file will be renamed to something like + # test.log.2022-08-17T20:28:32 + self.assertFalse(os.path.exists(fn)) + # We could also add a test to see if there is a file + # with a name like that. + files = list(glob.glob(f"{ntd}/*")) + print(f"files={files}") + self.assertEqual(len(files), 1) + self.assertTrue(files[0].startswith(fn + ".")) + + + def test_roll_file_not_present(self): + """ + Test roll of individual file when not present + """ + # try to roll logs when not open - result = trapd_io.roll_file("/file/not/present") - self.assertEqual(result, False) + self.assertFalse(trapd_io.roll_file("/file/not/present")) + def test_roll_file_no_write_perms(self): """ try to roll logs when not enough perms """ - no_perms_dir = "/tmp/opt/app/snmptrap/no_perms" - no_perms_file = "test.dat" - no_perms_fp = no_perms_dir + "/" + no_perms_file + with tempfile.TemporaryDirectory() as no_perms_dir: + # no_perms_dir = "/tmp/opt/app/snmptrap/no_perms" + no_perms_file = "test.dat" + no_perms_fp = no_perms_dir + "/" + no_perms_file - # required directory tree - try: - Path(no_perms_dir).mkdir(parents=True, exist_ok=True) - os.chmod(no_perms_dir, 0o777) - except Exception as e: - print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) - sys.exit(1) + # required directory tree + #try: + # Path(no_perms_dir).mkdir(parents=True, exist_ok=True) + # os.chmod(no_perms_dir, 0o700) + #except Exception as e: + # self.fail("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) + + # create empty file + open(no_perms_fp, "w").close() + os.chmod(no_perms_dir, 0o555) - # create empty file - open(no_perms_fp, "a").close() - os.chmod(no_perms_dir, 0o444) + # try to roll file in dir with no write perms + self.assertFalse(trapd_io.roll_file(no_perms_fp)) - result = trapd_io.roll_file(no_perms_fp) - self.assertEqual(result, False) + # the file should still be there + open(no_perms_fp).close() + + # allow the directory to be removed + os.chmod(no_perms_dir, 0o700) - # try to roll file in dir with no write perms - with pytest.raises(SystemExit) as pytest_wrapped_exception: - result = trapd_io.open_file(no_perms_fp) - assert pytest_wrapped_exception.type == SystemExit def test_open_file_exists(self): """ @@ -156,8 +431,9 @@ class test_trapd_io(unittest.TestCase): # try to roll logs when not open result = trapd_io.open_file(test_file) - compare = str(result).startswith("<_io.TextIOWrapper name=") - self.assertEqual(compare, True) + self.assertTrue(str(result).startswith("<_io.TextIOWrapper name=")) + self.assertIsInstance(result, io.TextIOWrapper) + def test_open_file_exists_does_not_exist(self): """ @@ -168,9 +444,9 @@ class test_trapd_io(unittest.TestCase): test_file = "/tmp/no_such_dir/snmptrap_pytest" # try to open file when dir not present - with pytest.raises(SystemExit) as pytest_wrapped_exception: + with self.assertRaises(SystemExit): result = trapd_io.open_file(test_file) - assert pytest_wrapped_exception.type == SystemExit + def test_close_file_exists(self): """ @@ -182,18 +458,113 @@ class test_trapd_io(unittest.TestCase): test_file = trapd_io.open_file(test_file_name) # close active file - result = trapd_io.close_file(test_file, test_file_name) - self.assertEqual(result, True) + self.assertTrue(trapd_io.close_file(test_file, test_file_name)) + - def test_close_file_does_not_exists(self): + def test_close_file_does_not_exist(self): """ Test closing non-existent file """ # try to roll logs when not open - result = trapd_io.close_file(None, None) - self.assertEqual(result, False) + self.assertFalse(trapd_io.close_file(None, None)) + + + def test_ecomp_logger_type_error(self): + """ + test trapd_io.ecomp_logger + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_error_bad_fd(self): + """ + test trapd_io.ecomp_logger, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_error_fd = tds.eelf_error_fd + tds.eelf_error_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_error_fd = sv_eelf_error_fd + + + def test_ecomp_logger_type_unknown_bad_fd(self): + """ + test trapd_io.ecomp_logger, unknown type, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_error_fd = tds.eelf_error_fd + tds.eelf_error_fd = test_trapd_io.WriteThrows() + self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_error_fd = sv_eelf_error_fd + + + def test_ecomp_logger_type_metrics(self): + """ + test trapd_io.ecomp_logger to metrics + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_metrics_bad_fd(self): + """ + test trapd_io.ecomp_logger to metrics, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_metrics_fd = tds.eelf_metrics_fd + tds.eelf_metrics_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_metrics_fd = sv_eelf_metrics_fd + + + def test_ecomp_logger_type_audit(self): + """ + test trapd_io.ecomp_logger to audit log + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + + + def test_ecomp_logger_type_audit_bad_fd(self): + """ + test trapd_io.ecomp_logger to audit log, but write() throws + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + # the following SHOULD be done with a context patch + sv_eelf_audit_fd = tds.eelf_audit_fd + tds.eelf_audit_fd = test_trapd_io.WriteThrows() + self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) + tds.eelf_audit_fd = sv_eelf_audit_fd + + + def test_ecomp_logger_type_unknown(self): + """ + test trapd_io.ecomp_logger + """ + + trapd_io.open_eelf_logs() + msg = "this is a test" + self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover unittest.main() |