aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_trapd_stormwatch.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_trapd_stormwatch.py')
-rw-r--r--tests/test_trapd_stormwatch.py250
1 files changed, 196 insertions, 54 deletions
diff --git a/tests/test_trapd_stormwatch.py b/tests/test_trapd_stormwatch.py
index 16b0a17..3041884 100644
--- a/tests/test_trapd_stormwatch.py
+++ b/tests/test_trapd_stormwatch.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2020-2021 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,14 +14,15 @@
# limitations under the License.
# ============LICENSE_END=========================================================
-import pytest
import unittest
import trapd_exit
import time
+from unittest.mock import patch
import trapd_stormwatch as sw
import trapd_stormwatch_settings as sws
import trapd_stats_settings as stats
+import trapd_settings as tds
class test_cleanup_and_exit(unittest.TestCase):
@@ -29,84 +30,182 @@ class test_cleanup_and_exit(unittest.TestCase):
Test for presense of required vars
"""
- def test_increment_existing_counter(self):
- """
- Test increment counter
- """
- sw.sw_init()
+ @classmethod
+ def setUp(cls):
+ tds.init()
stats.init()
+ sw.sw_init()
+ sws.init()
- oid = ".1.2.3.4.5.6"
- sws.sw_config_oid_dict[oid] = True
- sws.sw_config_low_water_in_interval_dict[oid] = 1
- sws.sw_config_high_water_in_interval_dict[oid] = 10
-
- try:
- sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6")
- result = True
- except:
- result = False
-
- self.assertEqual(result, True)
-
- # try again, but with stats.total_notifications removed
- delattr(stats, "total_notifications")
- try:
- sw.stats_increment_counters("192.168.1.1", ".1.2.3.4.5.6")
- result = True
- except:
- result = False
+ def test_sw_init(self):
+ """ test sw_init() """
+ sw.sw_init()
+ self.assertEqual(sws.sw_interval_in_seconds, 60)
+ sws.init()
+ self.assertEqual(sws.sw_config_category, {})
- self.assertEqual(result, True)
def test_sw_clear_dicts(self):
"""
Test sw_clear_dicts
"""
- sw.sw_init()
- # initialize attributes not handled by sw_init()
- sws.sw_storm_counter_dict = {}
- stats.agent_counter_dict = {}
- stats.oid_counter_dict = {}
- sws.sw_config_category = {}
- # provide a value that can tested for
- sws.sw_storm_counter_dict["abc"] = "def"
+ with patch.dict('trapd_stormwatch_settings.sw_storm_counter_dict'):
+ with patch.dict('trapd_stormwatch_settings.sw_config_category'):
+ with patch.dict('trapd_stats_settings.agent_counter_dict'):
+ with patch.dict('trapd_stats_settings.oid_counter_dict'):
+
+ # initialize attributes not handled by sw_init()
+ sws.sw_storm_counter_dict = {}
+ sws.sw_config_category = {}
+ stats.agent_counter_dict = {}
+ stats.oid_counter_dict = {}
+
+ # provide a value that can tested for
+ sws.sw_storm_counter_dict["abc"] = "def"
+ self.assertTrue("abc" in sws.sw_storm_counter_dict)
+
+ self.assertTrue(sw.sw_clear_dicts())
+ self.assertFalse("abc" in sws.sw_storm_counter_dict)
- sw.sw_clear_dicts()
- self.assertFalse("abc" in sws.sw_storm_counter_dict)
+ # now make sure we get an exception
+ sws.sw_config_category = 3
+ self.assertFalse(sw.sw_clear_dicts())
- # now make sure we get an exception
- sws.sw_config_category = 3
- self.assertFalse(sw.sw_clear_dicts())
- # clean up the attributes we added above
- delattr(sws, "sw_storm_counter_dict")
- delattr(stats, "agent_counter_dict")
- delattr(stats, "oid_counter_dict")
- delattr(sws, "sw_config_category")
+ def test_sw_load_trap_config(self):
+ """ Test sw_load_trap_config(_config) """
+ trap_dict_info = {
+ "uuid": "06f6e91c-3236-11e8-9953-005056865aac",
+ "agent address": "1.2.3.4",
+ "agent name": "test-agent.nodomain.com",
+ "cambria.partition": "test-agent.nodomain.com",
+ "community": "",
+ "community len": 0,
+ "epoch_serno": 15222068260000,
+ "protocol version": "v2c",
+ "time received": 1522206826.2938566,
+ "trap category": "ONAP-COLLECTOR-SNMPTRAP",
+ "sysUptime": "218567736",
+ "notify OID": "1.3.6.1.4.1.9999.9.9.999",
+ "trap_config": { },
+ "notify OID len": 10,
+ }
+
+ # normal operation, and variations
+ ret1 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret1, 0)
+ self.assertIsInstance(ret1, int)
+
+ trap_dict_info["trap_config"]["notify_oids"] = [
+ { "oidx": "1.3.6.1.4.1.9999.9.9.888" }
+ ]
+ ret2 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret2, 0)
+ self.assertIsInstance(ret2, int)
+
+ trap_dict_info["trap_config"]["metric_log_notification_threshold_pct"] = 33
+ ret3 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret3, 0)
+ self.assertIsInstance(ret3, int)
+
+ trap_dict_info["trap_config"]["sw_interval_in_seconds"] = 50
+ ret4 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret4, 0)
+ self.assertIsInstance(ret4, int)
+
+ trap_dict_info["trap_config"]["notify_oids"] = [
+ {
+ "oid": "1.3.6.1.4.1.9999.9.9.888",
+ "sw_high_water_in_interval": 3,
+ "sw_low_water_in_interval": 2,
+ "category": "abc",
+ }
+ ]
+ ret5 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret5, 1)
+ self.assertIsInstance(ret5, int)
+
+ delattr(sws, 'sw_storm_active_dict')
+ ret6 = sw.sw_load_trap_config(trap_dict_info)
+ self.assertEqual(ret6, 1)
+ self.assertIsInstance(ret6, int)
+
def test_sw_log_metrics(self):
"""
Test sw_clear_log_metrics
"""
- sw.sw_init()
-
stats.total_notifications = 3
stats.total_notifications = 50
sws.sw_interval_in_seconds = 30
stats.agent_counter_dict = {"a": 3, "b": 40}
stats.metric_log_notification_threshold_pct = 30
- sw.sw_log_metrics()
- # make sure we got this far
- assert True
+ with patch('trapd_stormwatch.ecomp_logger') as magic_ecomp_logger:
+ sw.sw_log_metrics()
+ self.assertEqual(magic_ecomp_logger.call_count, 3)
+
+
+ def test_increment_existing_counter(self):
+ """
+ Test increment counter. There should NOT be an exception.
+ """
+ oid = ".1.2.3.4.5.6"
+ sws.sw_config_oid_dict[oid] = True
+ sws.sw_config_low_water_in_interval_dict[oid] = 1
+ sws.sw_config_high_water_in_interval_dict[oid] = 10
+
+ loc_agent = "192.168.1.1"
+ stats.agent_counter_dict[loc_agent] = 2
+ stats.oid_counter_dict[oid] = 102
+
+ sv_total_notifications = stats.total_notifications
+ sv_agent_count_dict = stats.agent_counter_dict[loc_agent]
+ sv_oid_count_dict = stats.oid_counter_dict[oid]
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except Exception as e:
+ result = False
+ self.assertEqual(result, True, "test_increment_existing_counter")
+ self.assertEqual(stats.total_notifications, sv_total_notifications+1)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], sv_agent_count_dict+1)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+1)
+
+ # try again, without agent_counter_dict[loc_agent]
+ del stats.agent_counter_dict[loc_agent]
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except Exception as e:
+ result = False
+ self.assertEqual(result, True, "test_increment_existing_counter")
+ self.assertEqual(stats.total_notifications, sv_total_notifications+2)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], 1)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+2)
+
+ # try again, but with stats.total_notifications removed
+ delattr(stats, "total_notifications")
+
+ try:
+ sw.stats_increment_counters(loc_agent, oid)
+ result = True
+ except:
+ result = False
+
+ self.assertEqual(result, True, "stats.total_notifications removed")
+ self.assertEqual(stats.total_notifications, 1)
+ self.assertEqual(stats.agent_counter_dict[loc_agent], 2)
+ self.assertEqual(stats.oid_counter_dict[oid], sv_oid_count_dict+3)
+
def test_sw_storm_active(self):
"""
Test sw_storm_active()
"""
- sw.sw_init()
+ print(f"sw_last_stormwatch_dict_analysis={sws.sw_last_stormwatch_dict_analysis}")
+
# initialize attributes not handled by sw_init()
stats.oid_counter_dict = {}
@@ -154,7 +253,9 @@ class test_cleanup_and_exit(unittest.TestCase):
self.assertTrue(sw.sw_storm_active(loc_agent, loc_oid))
# now force sws.sw_last_stormwatch_dict_analysis to an old value
- sws.sw_last_stormwatch_dict_analysis = int(time.time()) - sws.sw_interval_in_seconds - 20
+ sws.sw_last_stormwatch_dict_analysis = (
+ int(time.time()) - sws.sw_interval_in_seconds - 20
+ )
# and make certain that stats.oid_counter_dict got cleared.
if not hasattr(stats, "oid_counter_dict"):
stats.oid_counter_dict = {}
@@ -164,6 +265,47 @@ class test_cleanup_and_exit(unittest.TestCase):
# .get("abc") != None)
-if __name__ == "__main__":
+ @patch('trapd_stormwatch.ecomp_logger')
+ def test_sw_reset_counter_dict(self, magic_ecomp_logger):
+ """ test sw_reset_counter_dict() """
+
+ loc_agent = "192.168.1.1"
+ loc_oid = ".1.2.3.4.5.6"
+ loc_agent_oid = loc_agent + " " + loc_oid
+
+ self.assertTrue(sw.sw_reset_counter_dict())
+ self.assertEqual(magic_ecomp_logger.call_count, 2)
+
+ # cases:
+ # for lao in storm_active_dict:
+ # storm_counter_dict[lao] >= high_water_val[lo]
+ # storm_counter_dict[lao] < high_water_val[lo]
+ # storm_counter_dict[lao] < low_water_val[lo]
+ # storm_counter_dict[lao] >= low_water_val[lo]
+
+ with patch.dict(sws.sw_storm_counter_dict, {
+ loc_agent_oid: 20
+ }):
+ # values around the 20 above
+ for high_water_mark in [2, 60]:
+ # values around the 20 above
+ for low_water_mark in [0, 30]:
+ with patch.dict(sws.sw_config_high_water_in_interval_dict, {
+ loc_oid: high_water_mark
+ }):
+ with patch.dict(sws.sw_config_low_water_in_interval_dict, {
+ loc_oid: low_water_mark
+ }):
+ with patch.dict(sws.sw_storm_active_dict, {
+ loc_agent_oid: "anything"
+ }):
+ sv_storm_counter = sws.sw_storm_counter_dict[loc_agent_oid]
+ magic_ecomp_logger.call_count = 0
+ self.assertTrue(sw.sw_reset_counter_dict())
+ self.assertEqual(magic_ecomp_logger.call_count, 3)
+ self.assertEqual(sws.sw_storm_counter_dict[loc_agent_oid], 0)
+
+
+if __name__ == "__main__": # pragma: no cover
# sws.init()
unittest.main()