From d2e368fcc6f9aeedec1f036d5b5e3e72e1460b59 Mon Sep 17 00:00:00 2001 From: Vijay Venkatesh Kumar Date: Wed, 15 Feb 2023 17:40:36 -0500 Subject: heartbeat ms Test updates Handle no-pragma cases Change-Id: I963542e39a59d6210645abed6e30a0cf5fc3159b Signed-off-by: Vijay Venkatesh Kumar Issue-ID: DCAEGEN2-2953 Signed-off-by: Vijay Venkatesh Kumar --- tests/test_db_monitoring.py | 190 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 182 insertions(+), 8 deletions(-) (limited to 'tests') diff --git a/tests/test_db_monitoring.py b/tests/test_db_monitoring.py index 5315a66..9d5c3c3 100644 --- a/tests/test_db_monitoring.py +++ b/tests/test_db_monitoring.py @@ -23,14 +23,134 @@ import tempfile import os import json import unittest +import time +import threading from unittest.mock import * from _pytest.outcomes import skip +from pickle import NONE _logger = logging.getLogger(__name__) class Test_db_monitoring(unittest.TestCase): + + class PseudoCursorCase1(): + # Test setup for RECONFIGURATION state + fetchall_1 = "" + @classmethod + def execute(ctype ,command,arg=None): + global fetchall_1 + if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"): + fetchall_1 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]] + elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "): + millisec = time.time() * 1000 + fetchall_1 = [[millisec -300 ,"testnodeA",0]] + elif command.startswith ("SELECT event_name FROM vnf_table_1"): + fetchall_1 = [["Heartbeat_vDNS","Heartbeat_vFw"]] + elif command.startswith ("SELECT current_state"): + fetchall_1 = [["RECONFIGURATION"]] + elif command.startswith ("DELETE "): + fetchall_1 = None + elif command.startswith ("UPDATE"): + fetchall_1 = None + else: + raise RuntimeError("Unknown db execution") + @classmethod + def fetchall (ctype): + global fetchall_1 + return fetchall_1 + @classmethod + def close(ctype): + pass + + class PseudoCursorCase2(): + # Test setup for RUNNING state and CL Onset condition + fetchall_2 = "" + @classmethod + def execute(ctype ,command,arg=None): + global fetchall_2 + if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"): + fetchall_2 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]] + elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "): + millisec = time.time() * 1000 + fetchall_2 = [[millisec -500 ,"testnodeA",0]] + elif command.startswith ("SELECT event_name FROM vnf_table_1"): + fetchall_2 = [["Heartbeat_vDNS","Heartbeat_vFw"]] + elif command.startswith ("SELECT current_state"): + fetchall_2 = [["RUNNING"]] + elif command.startswith ("DELETE "): + fetchall_2 = None + elif command.startswith ("UPDATE"): + fetchall_2 = None + else: + raise RuntimeError("Unknown db execution") + @classmethod + def fetchall (ctype): + global fetchall_2 + return fetchall_2 + @classmethod + def close(ctype): + pass + + class PseudoCursorCase3(): + # Test setup for RUNNING state and CL Abatement condition + fetchall_3 = "" + @classmethod + def execute(ctype ,command,arg=None): + global fetchall_3 + if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"): + fetchall_3 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]] + elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "): + millisec = time.time() * 1000 + fetchall_3 = [[millisec -20 ,"testnodeA",1]] + elif command.startswith ("SELECT event_name FROM vnf_table_1"): + fetchall_3 = [["Heartbeat_vDNS","Heartbeat_vFw"]] + elif command.startswith ("SELECT current_state"): + fetchall_3 = [["RUNNING"]] + elif command.startswith ("DELETE "): + fetchall_3 = None + elif command.startswith ("UPDATE"): + fetchall_3 = None + else: + raise RuntimeError("Unknown db execution") + @classmethod + def fetchall (ctype): + global fetchall_3 + return fetchall_3 + @classmethod + def close(ctype): + pass + + class PseudoCursorCase4(): + # Test setup for SourceNode not actively tracked for CL (validity_flag=0) + fetchall_4 = "" + @classmethod + def execute(ctype ,command,arg=None): + global fetchall_4 + if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"): + fetchall_4 = [[0,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]] + elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "): + millisec = time.time() * 1000 + fetchall_4 = [[millisec -500 ,"testnodeA",0]] + elif command.startswith ("SELECT event_name FROM vnf_table_1"): + fetchall_4 = [["Heartbeat_vDNS","Heartbeat_vFw"]] + elif command.startswith ("SELECT current_state"): + fetchall_4 = [["RUNNING"]] + elif command.startswith ("DELETE "): + fetchall_4 = None + elif command.startswith ("UPDATE"): + fetchall_4 = None + else: + raise RuntimeError("Unknown db execution") + @classmethod + def fetchall (ctype): + global fetchall_4 + return fetchall_4 + @classmethod + def close(ctype): + pass + def setUp(self): htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" @@ -57,30 +177,84 @@ class Test_db_monitoring(unittest.TestCase): ) self.assertEqual(status, True) - @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ", 1234)) + @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ-", 1234)) @patch("htbtworker.postgres_db_open") - def test_db_monitoring(self, mock1, mock2): + @patch("socket.gethostname", return_value = "XYZ") + #@patch("db_monitoring.sendControlLoopEvent", return_value = True) + def test_db_monitoring(self, mock1, mock2, mock3): status = True + mock_cursor = Mock() - mock2.cursor.return_value = mock_cursor + + ## Test setup for RECONFIGURATION state + mock_cursor.configure_mock( + **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase1} + ) + mock2.return_value = mock_cursor + # Test for outer else when PID doesn't match db_monitoring.db_monitoring( - "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1 ) self.assertEqual(status, True) + # Test for RECONFIGURATION state + mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ-", 1234) db_monitoring.db_monitoring( - "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1 ) self.assertEqual(status, True) - mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ", 1234) + + ## Test for RUNNING state and CL ONSET + mock_cursor.configure_mock( + **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase2} + ) + mock2.return_value = mock_cursor + db_monitoring.db_monitoring( + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1 + ) + self.assertEqual(status, True) + + ## Test for RUNNING state and CL Abatement condition + mock_cursor.configure_mock( + **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase3} + ) + mock2.return_value = mock_cursor db_monitoring.db_monitoring( - "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" , 1 ) self.assertEqual(status, True) + ## # Test setup for SourceNode not actively tracked for CL (validity_flag=0) + mock_cursor.configure_mock( + **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase4} + ) + mock2.return_value = mock_cursor + db_monitoring.db_monitoring( + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1 + ) + self.assertEqual(status, True) + + ### Test for exception with missing json file path + htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" + os.rename(htbtworker.configjsonfile, htbtworker.configjsonfile + "_1") + t1 = threading.Thread(target=Test_db_monitoring.switch_filepath, args=(htbtworker.configjsonfile,)) + t1.start() + db_monitoring.db_monitoring( + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 3 + ) + t1.join() + self.assertEqual(status, True) + + @classmethod + def switch_filepath(ctype, jsonfile): + #print (f"ctype: {ctype} type : {type(ctype)}") + time.sleep(5) + os.rename(htbtworker.configjsonfile+"_1", htbtworker.configjsonfile) + + def test_db_monitoring_wrapper(self): status = True db_monitoring.db_monitoring_wrapper("111", htbtworker.configjsonfile, number_of_iterations=0) - self.assertEqual(status, True) + self.assertEqual(status, True) if __name__ == "__main__": # pragma: no cover -- cgit 1.2.3-korg