aboutsummaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2023-02-15 17:40:36 -0500
committerVijay Venkatesh Kumar <vv770d@att.com>2023-02-22 16:00:41 -0500
commitd2e368fcc6f9aeedec1f036d5b5e3e72e1460b59 (patch)
treee166080b98114b700392e6a8c94911ddf9ccea83 /tests
parent76db0c428b23dfefa6d03b946d1343db0d1a787f (diff)
heartbeat ms Test updates
Handle no-pragma cases Change-Id: I963542e39a59d6210645abed6e30a0cf5fc3159b Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com> Issue-ID: DCAEGEN2-2953 Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/test_db_monitoring.py190
1 files changed, 182 insertions, 8 deletions
diff --git a/tests/test_db_monitoring.py b/tests/test_db_monitoring.py
index 5315a66..9d5c3c3 100644
--- a/tests/test_db_monitoring.py
+++ b/tests/test_db_monitoring.py
@@ -23,14 +23,134 @@ import tempfile
import os
import json
import unittest
+import time
+import threading
from unittest.mock import *
from _pytest.outcomes import skip
+from pickle import NONE
_logger = logging.getLogger(__name__)
class Test_db_monitoring(unittest.TestCase):
+
+ class PseudoCursorCase1():
+ # Test setup for RECONFIGURATION state
+ fetchall_1 = ""
+ @classmethod
+ def execute(ctype ,command,arg=None):
+ global fetchall_1
+ if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"):
+ fetchall_1 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]]
+ elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "):
+ millisec = time.time() * 1000
+ fetchall_1 = [[millisec -300 ,"testnodeA",0]]
+ elif command.startswith ("SELECT event_name FROM vnf_table_1"):
+ fetchall_1 = [["Heartbeat_vDNS","Heartbeat_vFw"]]
+ elif command.startswith ("SELECT current_state"):
+ fetchall_1 = [["RECONFIGURATION"]]
+ elif command.startswith ("DELETE "):
+ fetchall_1 = None
+ elif command.startswith ("UPDATE"):
+ fetchall_1 = None
+ else:
+ raise RuntimeError("Unknown db execution")
+ @classmethod
+ def fetchall (ctype):
+ global fetchall_1
+ return fetchall_1
+ @classmethod
+ def close(ctype):
+ pass
+
+ class PseudoCursorCase2():
+ # Test setup for RUNNING state and CL Onset condition
+ fetchall_2 = ""
+ @classmethod
+ def execute(ctype ,command,arg=None):
+ global fetchall_2
+ if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"):
+ fetchall_2 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]]
+ elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "):
+ millisec = time.time() * 1000
+ fetchall_2 = [[millisec -500 ,"testnodeA",0]]
+ elif command.startswith ("SELECT event_name FROM vnf_table_1"):
+ fetchall_2 = [["Heartbeat_vDNS","Heartbeat_vFw"]]
+ elif command.startswith ("SELECT current_state"):
+ fetchall_2 = [["RUNNING"]]
+ elif command.startswith ("DELETE "):
+ fetchall_2 = None
+ elif command.startswith ("UPDATE"):
+ fetchall_2 = None
+ else:
+ raise RuntimeError("Unknown db execution")
+ @classmethod
+ def fetchall (ctype):
+ global fetchall_2
+ return fetchall_2
+ @classmethod
+ def close(ctype):
+ pass
+
+ class PseudoCursorCase3():
+ # Test setup for RUNNING state and CL Abatement condition
+ fetchall_3 = ""
+ @classmethod
+ def execute(ctype ,command,arg=None):
+ global fetchall_3
+ if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"):
+ fetchall_3 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]]
+ elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "):
+ millisec = time.time() * 1000
+ fetchall_3 = [[millisec -20 ,"testnodeA",1]]
+ elif command.startswith ("SELECT event_name FROM vnf_table_1"):
+ fetchall_3 = [["Heartbeat_vDNS","Heartbeat_vFw"]]
+ elif command.startswith ("SELECT current_state"):
+ fetchall_3 = [["RUNNING"]]
+ elif command.startswith ("DELETE "):
+ fetchall_3 = None
+ elif command.startswith ("UPDATE"):
+ fetchall_3 = None
+ else:
+ raise RuntimeError("Unknown db execution")
+ @classmethod
+ def fetchall (ctype):
+ global fetchall_3
+ return fetchall_3
+ @classmethod
+ def close(ctype):
+ pass
+
+ class PseudoCursorCase4():
+ # Test setup for SourceNode not actively tracked for CL (validity_flag=0)
+ fetchall_4 = ""
+ @classmethod
+ def execute(ctype ,command,arg=None):
+ global fetchall_4
+ if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"):
+ fetchall_4 = [[0,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]]
+ elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "):
+ millisec = time.time() * 1000
+ fetchall_4 = [[millisec -500 ,"testnodeA",0]]
+ elif command.startswith ("SELECT event_name FROM vnf_table_1"):
+ fetchall_4 = [["Heartbeat_vDNS","Heartbeat_vFw"]]
+ elif command.startswith ("SELECT current_state"):
+ fetchall_4 = [["RUNNING"]]
+ elif command.startswith ("DELETE "):
+ fetchall_4 = None
+ elif command.startswith ("UPDATE"):
+ fetchall_4 = None
+ else:
+ raise RuntimeError("Unknown db execution")
+ @classmethod
+ def fetchall (ctype):
+ global fetchall_4
+ return fetchall_4
+ @classmethod
+ def close(ctype):
+ pass
+
def setUp(self):
htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
@@ -57,30 +177,84 @@ class Test_db_monitoring(unittest.TestCase):
)
self.assertEqual(status, True)
- @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ", 1234))
+ @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ-", 1234))
@patch("htbtworker.postgres_db_open")
- def test_db_monitoring(self, mock1, mock2):
+ @patch("socket.gethostname", return_value = "XYZ")
+ #@patch("db_monitoring.sendControlLoopEvent", return_value = True)
+ def test_db_monitoring(self, mock1, mock2, mock3):
status = True
+
mock_cursor = Mock()
- mock2.cursor.return_value = mock_cursor
+
+ ## Test setup for RECONFIGURATION state
+ mock_cursor.configure_mock(
+ **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase1}
+ )
+ mock2.return_value = mock_cursor
+ # Test for outer else when PID doesn't match
db_monitoring.db_monitoring(
- "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1
)
self.assertEqual(status, True)
+ # Test for RECONFIGURATION state
+ mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ-", 1234)
db_monitoring.db_monitoring(
- "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1
)
self.assertEqual(status, True)
- mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ", 1234)
+
+ ## Test for RUNNING state and CL ONSET
+ mock_cursor.configure_mock(
+ **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase2}
+ )
+ mock2.return_value = mock_cursor
+ db_monitoring.db_monitoring(
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1
+ )
+ self.assertEqual(status, True)
+
+ ## Test for RUNNING state and CL Abatement condition
+ mock_cursor.configure_mock(
+ **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase3}
+ )
+ mock2.return_value = mock_cursor
db_monitoring.db_monitoring(
- "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" , 1
)
self.assertEqual(status, True)
+ ## # Test setup for SourceNode not actively tracked for CL (validity_flag=0)
+ mock_cursor.configure_mock(
+ **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase4}
+ )
+ mock2.return_value = mock_cursor
+ db_monitoring.db_monitoring(
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1
+ )
+ self.assertEqual(status, True)
+
+ ### Test for exception with missing json file path
+ htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
+ os.rename(htbtworker.configjsonfile, htbtworker.configjsonfile + "_1")
+ t1 = threading.Thread(target=Test_db_monitoring.switch_filepath, args=(htbtworker.configjsonfile,))
+ t1.start()
+ db_monitoring.db_monitoring(
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 3
+ )
+ t1.join()
+ self.assertEqual(status, True)
+
+ @classmethod
+ def switch_filepath(ctype, jsonfile):
+ #print (f"ctype: {ctype} type : {type(ctype)}")
+ time.sleep(5)
+ os.rename(htbtworker.configjsonfile+"_1", htbtworker.configjsonfile)
+
+
def test_db_monitoring_wrapper(self):
status = True
db_monitoring.db_monitoring_wrapper("111", htbtworker.configjsonfile, number_of_iterations=0)
- self.assertEqual(status, True)
+ self.assertEqual(status, True)
if __name__ == "__main__": # pragma: no cover