diff options
author | Hansen, Tony (th1395) <th1395@att.com> | 2020-03-18 02:07:47 +0000 |
---|---|---|
committer | Hansen, Tony (th1395) <th1395@att.com> | 2020-03-18 02:08:01 +0000 |
commit | 54712eff0f7a9823b807a741d1004c5ade9e18d0 (patch) | |
tree | 61f3f891fc6a23874f124b0c0dd268d9be8bcc97 /tests/test_trapd_stormwatch.py | |
parent | 4d1fa90a01e0d77dc1e03db343e9d70bd7252509 (diff) |
improve the code coverage6.0.0-ONAPfrankfurt
Change-Id: I143f7f1e29817e9d4c0914e5349221fa63f5ce5d
Signed-off-by: Hansen, Tony (th1395) <th1395@att.com>
Issue-ID: DCAEGEN2-1903
Diffstat (limited to 'tests/test_trapd_stormwatch.py')
-rw-r--r-- | tests/test_trapd_stormwatch.py | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py index 236157e..6463a0c 100644 --- a/tests/test_trapd_stormwatch.py +++ b/tests/test_trapd_stormwatch.py @@ -17,6 +17,7 @@ import pytest import unittest import trapd_exit +import time import trapd_stormwatch as sw import trapd_stormwatch_settings as sws @@ -48,6 +49,119 @@ class test_cleanup_and_exit(unittest.TestCase): self.assertEqual(result, True) + # try again, but with stats.total_notifications removed + delattr(stats, "total_notifications") + + try: + sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6") + result = True + except: + result = False + + self.assertEqual(result, True) + + def test_sw_clear_dicts(self): + """ + Test sw_clear_dicts + """ + sw.sw_init() + # initialize attributes not handled by sw_init() + sws.sw_storm_counter_dict = {} + stats.agent_counter_dict = {} + stats.oid_counter_dict = {} + sws.sw_config_category = {} + # provide a value that can tested for + sws.sw_storm_counter_dict["abc"] = "def" + + sw.sw_clear_dicts() + self.assertFalse("abc" in sws.sw_storm_counter_dict) + + # now make sure we get an exception + sws.sw_config_category = 3 + self.assertFalse(sw.sw_clear_dicts()) + + # clean up the attributes we added above + delattr(sws, "sw_storm_counter_dict") + delattr(stats, "agent_counter_dict") + delattr(stats, "oid_counter_dict") + delattr(sws, "sw_config_category") + + def test_sw_log_metrics(self): + """ + Test sw_clear_log_metrics + """ + sw.sw_init() + + stats.total_notifications = 3 + stats.total_notifications = 50 + sws.sw_interval_in_seconds = 30 + stats.agent_counter_dict = { "a": 3, "b": 40 } + stats.metric_log_notification_threshold_pct = 30 + sw.sw_log_metrics() + + # make sure we got this far + assert(True) + + def test_sw_storm_active(self): + """ + Test sw_storm_active() + """ + sw.sw_init() + # initialize attributes not handled by sw_init() + stats.oid_counter_dict = {} + + # initialize attributes for the test + loc_agent = "192.168.1.1" + loc_oid = ".1.2.3.4.5.6" + sws.sw_config_high_water_in_interval_dict[loc_oid] = 50 + sws.sw_storm_counter_dict = {} + dict_key = loc_agent + " " + loc_oid + + # Four cases to test. + + # #1 + # if sws.sw_storm_active_dict[dict_key] exists + # return True + # #2 + # else (sws.sw_storm_active_dict does not exist) + # if sws.sw_storm_counter_dict[dict_key] > sws.sw_config_high_water_in_interval_dict[loc_oid] + # create sws.sw_storm_active_dict[dict_key] + # return True + # #3 + # else + # return False + # #4 + # sws.sw_last_stormwatch_dict_analysis gets reset often. + # but if time.time() - sw_last_stormwatch_dict_analysis > than sw_interval_in_seconds, + # then sw_reset_counter_dict() is invoked. + + # start with sws.sw_storm_active_dict[dict_key] does not exist + # and sws.sw_storm_counter_dict[dict_key] < high_water_mark + sws.sw_storm_active_dict = {} + sws.sw_storm_counter_dict[dict_key] = 10 + # Should return False + self.assertFalse(sw.sw_storm_active(loc_agent, loc_oid)) + # self.assertFalse(hasattr(sws, "sw_storm_active_dict")) + + # now with sws.sw_storm_counter_dict[dict_key] > high_water_mark + sws.sw_storm_counter_dict[dict_key] = 60 + # should create sws.sw_storm_active_dict[dict_key] and return True + self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) + self.assertTrue(sws.sw_storm_active_dict.get(dict_key) != None) + + # now that sws.sw_storm_active_dict[dict_key] exists + # should return True + self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) + + # now force sws.sw_last_stormwatch_dict_analysis to an old value + sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20 + # and make certain that stats.oid_counter_dict got cleared. + if not hasattr(stats, "oid_counter_dict"): + stats.oid_counter_dict = {} + stats.oid_counter_dict["abc"] = 5 + self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) + self.assertTrue(not hasattr(sws,"oid_counter_dict")) + # .get("abc") != None) if __name__ == '__main__': # sws.init() |