aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_trapd_stormwatch.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_trapd_stormwatch.py')
-rw-r--r--tests/test_trapd_stormwatch.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py
index 236157e..6463a0c 100644
--- a/tests/test_trapd_stormwatch.py
+++ b/tests/test_trapd_stormwatch.py
@@ -17,6 +17,7 @@
import pytest
import unittest
import trapd_exit
+import time
import trapd_stormwatch as sw
import trapd_stormwatch_settings as sws
@@ -48,6 +49,119 @@ class test_cleanup_and_exit(unittest.TestCase):
self.assertEqual(result, True)
+ # try again, but with stats.total_notifications removed
+ delattr(stats, "total_notifications")
+
+ try:
+ sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6")
+ result = True
+ except:
+ result = False
+
+ self.assertEqual(result, True)
+
+ def test_sw_clear_dicts(self):
+ """
+ Test sw_clear_dicts
+ """
+ sw.sw_init()
+ # initialize attributes not handled by sw_init()
+ sws.sw_storm_counter_dict = {}
+ stats.agent_counter_dict = {}
+ stats.oid_counter_dict = {}
+ sws.sw_config_category = {}
+ # provide a value that can tested for
+ sws.sw_storm_counter_dict["abc"] = "def"
+
+ sw.sw_clear_dicts()
+ self.assertFalse("abc" in sws.sw_storm_counter_dict)
+
+ # now make sure we get an exception
+ sws.sw_config_category = 3
+ self.assertFalse(sw.sw_clear_dicts())
+
+ # clean up the attributes we added above
+ delattr(sws, "sw_storm_counter_dict")
+ delattr(stats, "agent_counter_dict")
+ delattr(stats, "oid_counter_dict")
+ delattr(sws, "sw_config_category")
+
+ def test_sw_log_metrics(self):
+ """
+ Test sw_clear_log_metrics
+ """
+ sw.sw_init()
+
+ stats.total_notifications = 3
+ stats.total_notifications = 50
+ sws.sw_interval_in_seconds = 30
+ stats.agent_counter_dict = { "a": 3, "b": 40 }
+ stats.metric_log_notification_threshold_pct = 30
+ sw.sw_log_metrics()
+
+ # make sure we got this far
+ assert(True)
+
+ def test_sw_storm_active(self):
+ """
+ Test sw_storm_active()
+ """
+ sw.sw_init()
+ # initialize attributes not handled by sw_init()
+ stats.oid_counter_dict = {}
+
+ # initialize attributes for the test
+ loc_agent = "192.168.1.1"
+ loc_oid = ".1.2.3.4.5.6"
+ sws.sw_config_high_water_in_interval_dict[loc_oid] = 50
+ sws.sw_storm_counter_dict = {}
+ dict_key = loc_agent + " " + loc_oid
+
+ # Four cases to test.
+
+ # #1
+ # if sws.sw_storm_active_dict[dict_key] exists
+ # return True
+ # #2
+ # else (sws.sw_storm_active_dict does not exist)
+ # if sws.sw_storm_counter_dict[dict_key] > sws.sw_config_high_water_in_interval_dict[loc_oid]
+ # create sws.sw_storm_active_dict[dict_key]
+ # return True
+ # #3
+ # else
+ # return False
+ # #4
+ # sws.sw_last_stormwatch_dict_analysis gets reset often.
+ # but if time.time() - sw_last_stormwatch_dict_analysis > than sw_interval_in_seconds,
+ # then sw_reset_counter_dict() is invoked.
+
+ # start with sws.sw_storm_active_dict[dict_key] does not exist
+ # and sws.sw_storm_counter_dict[dict_key] < high_water_mark
+ sws.sw_storm_active_dict = {}
+ sws.sw_storm_counter_dict[dict_key] = 10
+ # Should return False
+ self.assertFalse(sw.sw_storm_active(loc_agent, loc_oid))
+ # self.assertFalse(hasattr(sws, "sw_storm_active_dict"))
+
+ # now with sws.sw_storm_counter_dict[dict_key] > high_water_mark
+ sws.sw_storm_counter_dict[dict_key] = 60
+ # should create sws.sw_storm_active_dict[dict_key] and return True
+ self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid))
+ self.assertTrue(sws.sw_storm_active_dict.get(dict_key) != None)
+
+ # now that sws.sw_storm_active_dict[dict_key] exists
+ # should return True
+ self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid))
+
+ # now force sws.sw_last_stormwatch_dict_analysis to an old value
+ sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20
+ # and make certain that stats.oid_counter_dict got cleared.
+ if not hasattr(stats, "oid_counter_dict"):
+ stats.oid_counter_dict = {}
+ stats.oid_counter_dict["abc"] = 5
+ self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid))
+ self.assertTrue(not hasattr(sws,"oid_counter_dict"))
+ # .get("abc") != None)
if __name__ == '__main__':
# sws.init()