From 54712eff0f7a9823b807a741d1004c5ade9e18d0 Mon Sep 17 00:00:00 2001 From: "Hansen, Tony (th1395)" Date: Wed, 18 Mar 2020 02:07:47 +0000 Subject: improve the code coverage Change-Id: I143f7f1e29817e9d4c0914e5349221fa63f5ce5d Signed-off-by: Hansen, Tony (th1395) Issue-ID: DCAEGEN2-1903 --- coverage.xml | 1454 +++++++++++++++++++++++++------------- snmptrap/mod/trapd_stormwatch.py | 27 +- tests/test_trapd_stormwatch.py | 114 +++ 3 files changed, 1091 insertions(+), 504 deletions(-) diff --git a/coverage.xml b/coverage.xml index 96da2c6..1ef294e 100644 --- a/coverage.xml +++ b/coverage.xml @@ -1,347 +1,388 @@ - + - C:\Users\vv770d\git\onap\snmptrap\snmptrap + /home/th1395x/onap/dcaegen2/snmptrap/snmptrapdiff --git a/snmptrap/mod/trapd_stormwatch.py b/snmptrap/mod/trapd_stormwatch.py index 623fe39..9e41bd0 100644 --- a/snmptrap/mod/trapd_stormwatch.py +++ b/snmptrap/mod/trapd_stormwatch.py @@ -80,14 +80,22 @@ def sw_clear_dicts(): :Variables: """ try: - stats.oid_counter_dict.clear() - stats.agent_counter_dict.clear() - sws.sw_storm_active_dict.clear() - sws.sw_storm_counter_dict.clear() - sws.sw_config_oid_dict.clear() - sws.sw_config_low_water_in_interval_dict.clear() - sws.sw_config_high_water_in_interval_dict.clear() - sws.sw_config_category.clear() + if hasattr(stats, "oid_counter_dict"): + stats.oid_counter_dict.clear() + if hasattr(stats, "agent_counter_dict"): + stats.agent_counter_dict.clear() + if hasattr(sws, "sw_storm_active_dict"): + sws.sw_storm_active_dict.clear() + if hasattr(sws, "sw_storm_counter_dict"): + sws.sw_storm_counter_dict.clear() + if hasattr(sws, "sw_config_oid_dict"): + sws.sw_config_oid_dict.clear() + if hasattr(sws, "sw_config_low_water_in_interval_dict"): + sws.sw_config_low_water_in_interval_dict.clear() + if hasattr(sws, "sw_config_high_water_in_interval_dict"): + sws.sw_config_high_water_in_interval_dict.clear() + if hasattr(sws, "sw_config_category"): + sws.sw_config_category.clear() return True except Exception as e: msg = "unable to reset stormwatch dictionaries - results will be indeterminate: %s" % ( @@ -367,6 +375,8 @@ def sw_storm_active(_loc_agent, _loc_oid): # if we got this far, trap is in stormwatch configs, we've incremented # counter in sw_storm_counter_dict - figure out if we are over limit if sws.sw_storm_counter_dict[_dict_key] > _high_water_val: + print(f"sws.sw_storm_counter_dict[{_dict_key}]({sws.sw_storm_counter_dict[_dict_key]}) > _high_water_val ({_high_water_val})") + _loc_agent = _dict_key.split()[0] _loc_oid = _dict_key.split()[1] msg = "STORM ACTIVE: received %d events (%s) from %s (greater than high water threshold: %d)" % ( @@ -381,6 +391,7 @@ def sw_storm_active(_loc_agent, _loc_oid): tds.CODE_GENERAL, msg) return True else: + print(f"NOT sws.sw_storm_counter_dict[{_dict_key}]({sws.sw_storm_counter_dict[_dict_key]}) > _high_water_val ({_high_water_val})") return False # # # # # # # # # # # # # diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py index 236157e..6463a0c 100644 --- a/tests/test_trapd_stormwatch.py +++ b/tests/test_trapd_stormwatch.py @@ -17,6 +17,7 @@ import pytest import unittest import trapd_exit +import time import trapd_stormwatch as sw import trapd_stormwatch_settings as sws @@ -48,6 +49,119 @@ class test_cleanup_and_exit(unittest.TestCase): self.assertEqual(result, True) + # try again, but with stats.total_notifications removed + delattr(stats, "total_notifications") + + try: + sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6") + result = True + except: + result = False + + self.assertEqual(result, True) + + def test_sw_clear_dicts(self): + """ + Test sw_clear_dicts + """ + sw.sw_init() + # initialize attributes not handled by sw_init() + sws.sw_storm_counter_dict = {} + stats.agent_counter_dict = {} + stats.oid_counter_dict = {} + sws.sw_config_category = {} + # provide a value that can tested for + sws.sw_storm_counter_dict["abc"] = "def" + + sw.sw_clear_dicts() + self.assertFalse("abc" in sws.sw_storm_counter_dict) + + # now make sure we get an exception + sws.sw_config_category = 3 + self.assertFalse(sw.sw_clear_dicts()) + + # clean up the attributes we added above + delattr(sws, "sw_storm_counter_dict") + delattr(stats, "agent_counter_dict") + delattr(stats, "oid_counter_dict") + delattr(sws, "sw_config_category") + + def test_sw_log_metrics(self): + """ + Test sw_clear_log_metrics + """ + sw.sw_init() + + stats.total_notifications = 3 + stats.total_notifications = 50 + sws.sw_interval_in_seconds = 30 + stats.agent_counter_dict = { "a": 3, "b": 40 } + stats.metric_log_notification_threshold_pct = 30 + sw.sw_log_metrics() + + # make sure we got this far + assert(True) + + def test_sw_storm_active(self): + """ + Test sw_storm_active() + """ + sw.sw_init() + # initialize attributes not handled by sw_init() + stats.oid_counter_dict = {} + + # initialize attributes for the test + loc_agent = "192.168.1.1" + loc_oid = ".1.2.3.4.5.6" + sws.sw_config_high_water_in_interval_dict[loc_oid] = 50 + sws.sw_storm_counter_dict = {} + dict_key = loc_agent + " " + loc_oid + + # Four cases to test. + + # #1 + # if sws.sw_storm_active_dict[dict_key] exists + # return True + # #2 + # else (sws.sw_storm_active_dict does not exist) + # if sws.sw_storm_counter_dict[dict_key] > sws.sw_config_high_water_in_interval_dict[loc_oid] + # create sws.sw_storm_active_dict[dict_key] + # return True + # #3 + # else + # return False + # #4 + # sws.sw_last_stormwatch_dict_analysis gets reset often. + # but if time.time() - sw_last_stormwatch_dict_analysis > than sw_interval_in_seconds, + # then sw_reset_counter_dict() is invoked. + + # start with sws.sw_storm_active_dict[dict_key] does not exist + # and sws.sw_storm_counter_dict[dict_key] < high_water_mark + sws.sw_storm_active_dict = {} + sws.sw_storm_counter_dict[dict_key] = 10 + # Should return False + self.assertFalse(sw.sw_storm_active(loc_agent, loc_oid)) + # self.assertFalse(hasattr(sws, "sw_storm_active_dict")) + + # now with sws.sw_storm_counter_dict[dict_key] > high_water_mark + sws.sw_storm_counter_dict[dict_key] = 60 + # should create sws.sw_storm_active_dict[dict_key] and return True + self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) + self.assertTrue(sws.sw_storm_active_dict.get(dict_key) != None) + + # now that sws.sw_storm_active_dict[dict_key] exists + # should return True + self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) + + # now force sws.sw_last_stormwatch_dict_analysis to an old value + sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20 + # and make certain that stats.oid_counter_dict got cleared. + if not hasattr(stats, "oid_counter_dict"): + stats.oid_counter_dict = {} + stats.oid_counter_dict["abc"] = 5 + self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid)) + self.assertTrue(not hasattr(sws,"oid_counter_dict")) + # .get("abc") != None) if __name__ == '__main__': # sws.init() -- cgit 1.2.3-korg