diff options
author | Tony Hansen <tony@att.com> | 2023-01-06 13:46:47 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2023-01-06 13:46:47 +0000 |
commit | aefdb1d11b1f36750438487fe1da85911acca4c6 (patch) | |
tree | 1654d20484dfa280781a89069d7920e6b617ccc8 /tests/test_htbtworker.py | |
parent | 7e3d845ca700895cc2dec9a5d3282c21860591d8 (diff) | |
parent | 341b5bb2347c30344662675936b90b325efe5520 (diff) |
Merge "Heartbeat code refactoring"
Diffstat (limited to 'tests/test_htbtworker.py')
-rw-r--r-- | tests/test_htbtworker.py | 163 |
1 files changed, 139 insertions, 24 deletions
diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py index 78c9087..ee03ddb 100644 --- a/tests/test_htbtworker.py +++ b/tests/test_htbtworker.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2020-2022 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020-2023 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,37 +14,152 @@ # limitations under the License. # ============LICENSE_END========================================================= -from miss_htbt_service import htbtworker +import htbtworker import os import tempfile -import json +import json +import unittest +from unittest.mock import * +from _pytest.outcomes import skip +class Test_htbtworker(unittest.TestCase): -def run_test(i): - """ - read_json_file() opens the file CWD/prefix/test{j}.json and returns the json value found there - """ - j = i + 1 - tdir = tempfile.TemporaryDirectory() - prefix = "../../../../../../../../../../../../.." - pdir = f"{prefix}{tdir.name}" - fname = f"{tdir.name}/test{j}.json" - with open(fname, "w") as fp: - json.dump({"test": i}, fp) - assert os.path.isfile(f"{tdir.name}/test{j}.json") - assert os.path.isfile(f"{pdir}/test{j}.json") - cfg = htbtworker.read_json_file(i, prefix=pdir) - assert cfg["test"] == i + def setUp(self): + htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json" + @patch('requests.get') + @patch('htbtworker.check_process_reconfiguration', return_value=False) + @patch('htbtworker.get_eventnamelist') + @patch('htbtworker.sql_executor') + def test_process_msg(self, mock1, mock2, mock3, sqlmock1): + """ + Test to verify event processing using mock + TBD - Negative test + """ + + status = True + dmaap_data = [{"event":{"commonEventHeader":{"startEpochMicrosec":1544608845841,"sourceId":"VNFB_SRC5","eventId":"mvfs10","nfcNamingCode":"VNFB","timeZoneOffset":"UTC-05:30","reportingEntityId":"cc305d54-75b4-431b-adb2-eb6b9e541234","eventType":"platform","priority":"Normal","version":"4.0.2","reportingEntityName":"ibcx0001vm002oam001","sequence":1000,"domain":"heartbeat","lastEpochMicrosec":1544608845841,"eventName":"Heartbeat_vFW","vesEventListenerVersion":"7.0.2","sourceName":"SOURCE_NAME2","nfNamingCode":"VNFB"},"heartbeatFields":{"heartbeatInterval":20,"heartbeatFieldsVersion":"3.0"}}}] + + mock_resp = Mock() + mock_resp.configure_mock( + **{ + "text": json.dumps(dmaap_data) + } + ) + mock3.return_value = [("Heartbeat_vDNS", "Heartbeat_vFW")] + mock1.return_value = mock_resp -def test_read_json_file_0(): - run_test(0) + filename = "test-config.json" + htbtworker.process_msg(filename, number_of_iterations=1) + self.assertEqual(status, True) + + def test_parse_event(self): + """ + test_parse_event() opens the file test1.json and returns attributes + """ + filename = (os.path.dirname(__file__))+"/test1.json" + with open(filename,"r") as fp: + data = fp.read() + srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data) + self.assertEqual(srcname, "SOURCE_NAME1") + self.assertEqual(event_name, "Heartbeat_vDNS") -def test_read_json_file_1(): - run_test(1) + filename = (os.path.dirname(__file__))+"/test4.json" + with open(filename,"r") as fp: + data = fp.read() + srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data) + self.assertEqual(srcname, "zalp1bmdns01cmd010") + self.assertEqual(event_name, "Heartbeat_vDNS") -def test_read_json_file_2(): - run_test(2) + @patch('htbtworker.sql_executor') + def test_create_and_check_vnf2_table (self, mock_settings): + """ + Test to verify existence of given table + """ + mock_cursor = Mock() + mock_cursor.configure_mock( + **{ + "fetchone.return_value": [("vnf_table_2")] + } + ) + mock_settings.return_value = mock_cursor + status = htbtworker.check_and_create_vnf2_table () + self.assertEqual(status, True) + + with patch('htbtworker.sql_executor', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + status = htbtworker.check_and_create_vnf2_table () + self.assertEqual(False, status) + + @patch('htbtworker.sql_executor') + def test_new_vnf_entry (self, sql_mock): + """ + Check to verify if new node entry is made for tracking HB + """ + status = True + htbtworker.new_vnf_entry ("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1) + self.assertEqual(status, True) + + @patch('htbtworker.sql_executor') + def test_get_eventnamelist (self, sql_mock): + """ + Test to verify eventname list is returned from vnf_table_1 + TBD - List comparison + """ + eventname_list = [("Heartbeat_vDNS", "Heartbeat_vFW")] + mock_cursor = Mock() + mock_cursor.configure_mock( + **{ + "fetchall.return_value": eventname_list + } + ) + sql_mock.return_value = mock_cursor + return_list = htbtworker.get_eventnamelist () + self.assertIn("Heartbeat_vDNS", return_list) + + @patch('htbtworker.postgres_db_open') + def test_sql_executor (self, db_mock): + """ + Test sql executor wrapper method + """ + htbtworker.sql_executor ("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2") + htbtworker.sql_executor ("INSERT into information_schema.tables,") + connection_db = db_mock + with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + flag = htbtworker.commit_and_close_db(connection_db) + self.assertEqual(False, flag) + + @patch('psycopg2.connect') + def test_postgres_db_open (self, mock): + """ + Test wrapper for postgres db connection + """ + conn = htbtworker.postgres_db_open() + self.assertIsNotNone(conn) + + @patch('misshtbtd.read_hb_common') + def test_check_process_reconfiguration (self, mock): + """ + Test if DB is in reconfiguration state + """ + mock.return_value = ("1234","RUNNING", "XYZ", 1234) + flag = htbtworker.check_process_reconfiguration("test", "test","x.x.x.x", "1234", "test_db") + self.assertEqual(False, flag) + + @patch('htbtworker.postgres_db_open') + def test_commit_and_close_db (self, db_mock): + """ + Test commit and close db + """ + connection_db = db_mock + flag = htbtworker.commit_and_close_db(connection_db) + self.assertEqual(True, flag) + with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + flag = htbtworker.commit_and_close_db(connection_db) + self.assertEqual(False, flag) + + +if __name__ == "__main__": # pragma: no cover + unittest.main() |