diff options
Diffstat (limited to 'tests/test_trapd_stormwatch.py')
-rw-r--r-- | tests/test_trapd_stormwatch.py | 250 |
1 files changed, 196 insertions, 54 deletions
diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py index 16b0a17..3041884 100644 --- a/tests/test_trapd_stormwatch.py +++ b/tests/test_trapd_stormwatch.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,14 +14,15 @@ # limitations under the License. # ============LICENSE_END========================================================= -import pytest import unittest import trapd_exit import time +from unittest.mock import patch import trapd_stormwatch as sw import trapd_stormwatch_settings as sws import trapd_stats_settings as stats +import trapd_settings as tds class test_cleanup_and_exit(unittest.TestCase): @@ -29,84 +30,182 @@ class test_cleanup_and_exit(unittest.TestCase): Test for presense of required vars """ - def test_increment_existing_counter(self): - """ - Test increment counter - """ - sw.sw_init() + @classmethod + def setUp(cls): + tds.init() stats.init() + sw.sw_init() + sws.init() - oid = ".1.2.3.4.5.6" - sws.sw_config_oid_dict[oid] = True - sws.sw_config_low_water_in_interval_dict[oid] = 1 - sws.sw_config_high_water_in_interval_dict[oid] = 10 - - try: - sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6") - result = True - except: - result = False - - self.assertEqual(result, True) - - # try again, but with stats.total_notifications removed - delattr(stats, "total_notifications") - try: - sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6") - result = True - except: - result = False + def test_sw_init(self): + """ test sw_init() """ + sw.sw_init() + self.assertEqual(sws.sw_interval_in_seconds, 60) + sws.init() + self.assertEqual(sws.sw_config_category, {}) - self.assertEqual(result, True) def test_sw_clear_dicts(self): """ Test sw_clear_dicts """ - sw.sw_init() - # initialize attributes not handled by sw_init() - sws.sw_storm_counter_dict = {} - stats.agent_counter_dict = {} - stats.oid_counter_dict = {} - sws.sw_config_category = {} - # provide a value that can tested for - sws.sw_storm_counter_dict["abc"] = "def" + with patch.dict('trapd_stormwatch_settings.sw_storm_counter_dict'): + with patch.dict('trapd_stormwatch_settings.sw_config_category'): + with patch.dict('trapd_stats_settings.agent_counter_dict'): + with patch.dict('trapd_stats_settings.oid_counter_dict'): + + # initialize attributes not handled by sw_init() + sws.sw_storm_counter_dict = {} + sws.sw_config_category = {} + stats.agent_counter_dict = {} + stats.oid_counter_dict = {} + + # provide a value that can tested for + sws.sw_storm_counter_dict["abc"] = "def" + self.assertTrue("abc" in sws.sw_storm_counter_dict) + + self.assertTrue(sw.sw_clear_dicts()) + self.assertFalse("abc" in sws.sw_storm_counter_dict) - sw.sw_clear_dicts() - self.assertFalse("abc" in sws.sw_storm_counter_dict) + # now make sure we get an exception + sws.sw_config_category = 3 + self.assertFalse(sw.sw_clear_dicts()) - # now make sure we get an exception - sws.sw_config_category = 3 - self.assertFalse(sw.sw_clear_dicts()) - # clean up the attributes we added above - delattr(sws, "sw_storm_counter_dict") - delattr(stats, "agent_counter_dict") - delattr(stats, "oid_counter_dict") - delattr(sws, "sw_config_category") + def test_sw_load_trap_config(self): + """ Test sw_load_trap_config(_config) """ + trap_dict_info = { + "uuid": "06f6e91c-3236-11e8-9953-005056865aac", + "agent address": "1.2.3.4", + "agent name": "test-agent.nodomain.com", + "cambria.partition": "test-agent.nodomain.com", + "community": "", + "community len": 0, + "epoch_serno": 15222068260000, + "protocol version": "v2c", + "time received": 1522206826.2938566, + "trap category": "ONAP-COLLECTOR-SNMPTRAP", + "sysUptime": "218567736", + "notify OID": "1.3.6.1.4.1.9999.9.9.999", + "trap_config": { }, + "notify OID len": 10, + } + + # normal operation, and variations + ret1 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret1, 0) + self.assertIsInstance(ret1, int) + + trap_dict_info["trap_config"]["notify_oids"] = [ + { "oidx": "1.3.6.1.4.1.9999.9.9.888" } + ] + ret2 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret2, 0) + self.assertIsInstance(ret2, int) + + trap_dict_info["trap_config"]["metric_log_notification_threshold_pct"] = 33 + ret3 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret3, 0) + self.assertIsInstance(ret3, int) + + trap_dict_info["trap_config"]["sw_interval_in_seconds"] = 50 + ret4 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret4, 0) + self.assertIsInstance(ret4, int) + + trap_dict_info["trap_config"]["notify_oids"] = [ + { + "oid": "1.3.6.1.4.1.9999.9.9.888", + "sw_high_water_in_interval": 3, + "sw_low_water_in_interval": 2, + "category": "abc", + } + ] + ret5 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret5, 1) + self.assertIsInstance(ret5, int) + + delattr(sws, 'sw_storm_active_dict') + ret6 = sw.sw_load_trap_config(trap_dict_info) + self.assertEqual(ret6, 1) + self.assertIsInstance(ret6, int) + def test_sw_log_metrics(self): """ Test sw_clear_log_metrics """ - sw.sw_init() - stats.total_notifications = 3 stats.total_notifications = 50 sws.sw_interval_in_seconds = 30 stats.agent_counter_dict = {"a": 3, "b": 40} stats.metric_log_notification_threshold_pct = 30 - sw.sw_log_metrics() - # make sure we got this far - assert True + with patch('trapd_stormwatch.ecomp_logger') as magic_ecomp_logger: + sw.sw_log_metrics() + self.assertEqual(magic_ecomp_logger.call_count, 3) + + + def test_increment_existing_counter(self): + """ + Test increment counter. There should NOT be an exception. + """ + oid = ".1.2.3.4.5.6" + sws.sw_config_oid_dict[oid] = True + sws.sw_config_low_water_in_interval_dict[oid] = 1 + sws.sw_config_high_water_in_interval_dict[oid] = 10 + + loc_agent = "192.168.1.1" + stats.agent_counter_dict[loc_agent] = 2 + stats.oid_counter_dict[oid] = 102 + + sv_total_notifications = stats.total_notifications + sv_agent_count_dict = stats.agent_counter_dict[loc_agent] + sv_oid_count_dict = stats.oid_counter_dict[oid] + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except Exception as e: + result = False + self.assertEqual(result, True, "test_increment_existing_counter") + self.assertEqual(stats.total_notifications, sv_total_notifications+1) + self.assertEqual(stats.agent_counter_dict[loc_agent], sv_agent_count_dict+1) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+1) + + # try again, without agent_counter_dict[loc_agent] + del stats.agent_counter_dict[loc_agent] + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except Exception as e: + result = False + self.assertEqual(result, True, "test_increment_existing_counter") + self.assertEqual(stats.total_notifications, sv_total_notifications+2) + self.assertEqual(stats.agent_counter_dict[loc_agent], 1) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+2) + + # try again, but with stats.total_notifications removed + delattr(stats, "total_notifications") + + try: + sw.stats_increment_counters(loc_agent, oid) + result = True + except: + result = False + + self.assertEqual(result, True, "stats.total_notifications removed") + self.assertEqual(stats.total_notifications, 1) + self.assertEqual(stats.agent_counter_dict[loc_agent], 2) + self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+3) + def test_sw_storm_active(self): """ Test sw_storm_active() """ - sw.sw_init() + print(f"sw_last_stormwatch_dict_analysis={sws.sw_last_stormwatch_dict_analysis}") + # initialize attributes not handled by sw_init() stats.oid_counter_dict = {} @@ -154,7 +253,9 @@ class test_cleanup_and_exit(unittest.TestCase): self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) # now force sws.sw_last_stormwatch_dict_analysis to an old value - sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20 + sws.sw_last_stormwatch_dict_analysis = ( + int(time.time()) - sws.sw_interval_in_seconds - 20 + ) # and make certain that stats.oid_counter_dict got cleared. if not hasattr(stats, "oid_counter_dict"): stats.oid_counter_dict = {} @@ -164,6 +265,47 @@ class test_cleanup_and_exit(unittest.TestCase): # .get("abc") != None) -if __name__ == "__main__": + @patch('trapd_stormwatch.ecomp_logger') + def test_sw_reset_counter_dict(self, magic_ecomp_logger): + """ test sw_reset_counter_dict() """ + + loc_agent = "192.168.1.1" + loc_oid = ".1.2.3.4.5.6" + loc_agent_oid = loc_agent + " " + loc_oid + + self.assertTrue(sw.sw_reset_counter_dict()) + self.assertEqual(magic_ecomp_logger.call_count, 2) + + # cases: + # for lao in storm_active_dict: + # storm_counter_dict[lao] >= high_water_val[lo] + # storm_counter_dict[lao] < high_water_val[lo] + # storm_counter_dict[lao] < low_water_val[lo] + # storm_counter_dict[lao] >= low_water_val[lo] + + with patch.dict(sws.sw_storm_counter_dict, { + loc_agent_oid: 20 + }): + # values around the 20 above + for high_water_mark in [2, 60]: + # values around the 20 above + for low_water_mark in [0, 30]: + with patch.dict(sws.sw_config_high_water_in_interval_dict, { + loc_oid: high_water_mark + }): + with patch.dict(sws.sw_config_low_water_in_interval_dict, { + loc_oid: low_water_mark + }): + with patch.dict(sws.sw_storm_active_dict, { + loc_agent_oid: "anything" + }): + sv_storm_counter = sws.sw_storm_counter_dict[loc_agent_oid] + magic_ecomp_logger.call_count = 0 + self.assertTrue(sw.sw_reset_counter_dict()) + self.assertEqual(magic_ecomp_logger.call_count, 3) + self.assertEqual(sws.sw_storm_counter_dict[loc_agent_oid], 0) + + +if __name__ == "__main__": # pragma: no cover # sws.init() unittest.main() |