diff options
Diffstat (limited to 'tests/test_htbtworker.py')
-rw-r--r-- | tests/test_htbtworker.py | 167 |
1 files changed, 89 insertions, 78 deletions
diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py index ee03ddb..4066a9f 100644 --- a/tests/test_htbtworker.py +++ b/tests/test_htbtworker.py @@ -18,148 +18,159 @@ import htbtworker import os import tempfile -import json +import json import unittest from unittest.mock import * from _pytest.outcomes import skip -class Test_htbtworker(unittest.TestCase): +class Test_htbtworker(unittest.TestCase): def setUp(self): - htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json" + htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" - @patch('requests.get') - @patch('htbtworker.check_process_reconfiguration', return_value=False) - @patch('htbtworker.get_eventnamelist') - @patch('htbtworker.sql_executor') - def test_process_msg(self, mock1, mock2, mock3, sqlmock1): + @patch("requests.get") + @patch("htbtworker.check_process_reconfiguration", return_value=False) + @patch("htbtworker.get_eventnamelist") + @patch("htbtworker.sql_executor") + def test_process_msg(self, mock1, mock2, mock3, sqlmock1): """ Test to verify event processing using mock TBD - Negative test """ - + status = True - dmaap_data = [{"event":{"commonEventHeader":{"startEpochMicrosec":1544608845841,"sourceId":"VNFB_SRC5","eventId":"mvfs10","nfcNamingCode":"VNFB","timeZoneOffset":"UTC-05:30","reportingEntityId":"cc305d54-75b4-431b-adb2-eb6b9e541234","eventType":"platform","priority":"Normal","version":"4.0.2","reportingEntityName":"ibcx0001vm002oam001","sequence":1000,"domain":"heartbeat","lastEpochMicrosec":1544608845841,"eventName":"Heartbeat_vFW","vesEventListenerVersion":"7.0.2","sourceName":"SOURCE_NAME2","nfNamingCode":"VNFB"},"heartbeatFields":{"heartbeatInterval":20,"heartbeatFieldsVersion":"3.0"}}}] - - mock_resp = Mock() - mock_resp.configure_mock( - **{ - "text": json.dumps(dmaap_data) - } - ) + dmaap_data = [ + { + "event": { + "commonEventHeader": { + "startEpochMicrosec": 1544608845841, + "sourceId": "VNFB_SRC5", + "eventId": "mvfs10", + "nfcNamingCode": "VNFB", + "timeZoneOffset": "UTC-05:30", + "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234", + "eventType": "platform", + "priority": "Normal", + "version": "4.0.2", + "reportingEntityName": "ibcx0001vm002oam001", + "sequence": 1000, + "domain": "heartbeat", + "lastEpochMicrosec": 1544608845841, + "eventName": "Heartbeat_vFW", + "vesEventListenerVersion": "7.0.2", + "sourceName": "SOURCE_NAME2", + "nfNamingCode": "VNFB", + }, + "heartbeatFields": {"heartbeatInterval": 20, "heartbeatFieldsVersion": "3.0"}, + } + } + ] + + mock_resp = Mock() + mock_resp.configure_mock(**{"text": json.dumps(dmaap_data)}) mock3.return_value = [("Heartbeat_vDNS", "Heartbeat_vFW")] - mock1.return_value = mock_resp + mock1.return_value = mock_resp filename = "test-config.json" htbtworker.process_msg(filename, number_of_iterations=1) self.assertEqual(status, True) - def test_parse_event(self): """ test_parse_event() opens the file test1.json and returns attributes """ - filename = (os.path.dirname(__file__))+"/test1.json" - with open(filename,"r") as fp: - data = fp.read() - srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data) + filename = (os.path.dirname(__file__)) + "/test1.json" + with open(filename, "r") as fp: + data = fp.read() + srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data) self.assertEqual(srcname, "SOURCE_NAME1") self.assertEqual(event_name, "Heartbeat_vDNS") - filename = (os.path.dirname(__file__))+"/test4.json" - with open(filename,"r") as fp: - data = fp.read() - srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data) + filename = (os.path.dirname(__file__)) + "/test4.json" + with open(filename, "r") as fp: + data = fp.read() + srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data) self.assertEqual(srcname, "zalp1bmdns01cmd010") self.assertEqual(event_name, "Heartbeat_vDNS") - - @patch('htbtworker.sql_executor') - def test_create_and_check_vnf2_table (self, mock_settings): + @patch("htbtworker.sql_executor") + def test_create_and_check_vnf2_table(self, mock_settings): """ Test to verify existence of given table """ - mock_cursor = Mock() - mock_cursor.configure_mock( - **{ - "fetchone.return_value": [("vnf_table_2")] - } - ) + mock_cursor = Mock() + mock_cursor.configure_mock(**{"fetchone.return_value": [("vnf_table_2")]}) mock_settings.return_value = mock_cursor - status = htbtworker.check_and_create_vnf2_table () - self.assertEqual(status, True) - - with patch('htbtworker.sql_executor', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): - status = htbtworker.check_and_create_vnf2_table () - self.assertEqual(False, status) - - @patch('htbtworker.sql_executor') - def test_new_vnf_entry (self, sql_mock): + status = htbtworker.check_and_create_vnf2_table() + self.assertEqual(status, True) + + with patch("htbtworker.sql_executor", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + status = htbtworker.check_and_create_vnf2_table() + self.assertEqual(False, status) + + @patch("htbtworker.sql_executor") + def test_new_vnf_entry(self, sql_mock): """ Check to verify if new node entry is made for tracking HB """ status = True - htbtworker.new_vnf_entry ("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1) + htbtworker.new_vnf_entry("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1) self.assertEqual(status, True) - @patch('htbtworker.sql_executor') - def test_get_eventnamelist (self, sql_mock): + @patch("htbtworker.sql_executor") + def test_get_eventnamelist(self, sql_mock): """ - Test to verify eventname list is returned from vnf_table_1 - TBD - List comparison + Test to verify eventname list is returned from vnf_table_1 + TBD - List comparison """ eventname_list = [("Heartbeat_vDNS", "Heartbeat_vFW")] - mock_cursor = Mock() - mock_cursor.configure_mock( - **{ - "fetchall.return_value": eventname_list - } - ) + mock_cursor = Mock() + mock_cursor.configure_mock(**{"fetchall.return_value": eventname_list}) sql_mock.return_value = mock_cursor - return_list = htbtworker.get_eventnamelist () - self.assertIn("Heartbeat_vDNS", return_list) - - @patch('htbtworker.postgres_db_open') - def test_sql_executor (self, db_mock): + return_list = htbtworker.get_eventnamelist() + self.assertIn("Heartbeat_vDNS", return_list) + + @patch("htbtworker.postgres_db_open") + def test_sql_executor(self, db_mock): """ - Test sql executor wrapper method + Test sql executor wrapper method """ - htbtworker.sql_executor ("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2") - htbtworker.sql_executor ("INSERT into information_schema.tables,") + htbtworker.sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2") + htbtworker.sql_executor("INSERT into information_schema.tables,") connection_db = db_mock - with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(False, flag) - @patch('psycopg2.connect') - def test_postgres_db_open (self, mock): + @patch("psycopg2.connect") + def test_postgres_db_open(self, mock): """ Test wrapper for postgres db connection """ conn = htbtworker.postgres_db_open() self.assertIsNotNone(conn) - @patch('misshtbtd.read_hb_common') - def test_check_process_reconfiguration (self, mock): + @patch("misshtbtd.read_hb_common") + def test_check_process_reconfiguration(self, mock): """ Test if DB is in reconfiguration state """ - mock.return_value = ("1234","RUNNING", "XYZ", 1234) - flag = htbtworker.check_process_reconfiguration("test", "test","x.x.x.x", "1234", "test_db") + mock.return_value = ("1234", "RUNNING", "XYZ", 1234) + flag = htbtworker.check_process_reconfiguration("test", "test", "x.x.x.x", "1234", "test_db") self.assertEqual(False, flag) - - @patch('htbtworker.postgres_db_open') - def test_commit_and_close_db (self, db_mock): + + @patch("htbtworker.postgres_db_open") + def test_commit_and_close_db(self, db_mock): """ Test commit and close db """ connection_db = db_mock flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(True, flag) - with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(False, flag) - - -if __name__ == "__main__": # pragma: no cover + + +if __name__ == "__main__": # pragma: no cover unittest.main() |