# ============LICENSE_START======================================================= # Copyright (c) 2020-2023 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= import htbtworker import os import tempfile import json import unittest from unittest.mock import * from _pytest.outcomes import skip class Test_htbtworker(unittest.TestCase): def setUp(self): htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json" @patch('requests.get') @patch('htbtworker.check_process_reconfiguration', return_value=False) @patch('htbtworker.get_eventnamelist') @patch('htbtworker.sql_executor') def test_process_msg(self, mock1, mock2, mock3, sqlmock1): """ Test to verify event processing using mock TBD - Negative test """ status = True dmaap_data = [{"event":{"commonEventHeader":{"startEpochMicrosec":1544608845841,"sourceId":"VNFB_SRC5","eventId":"mvfs10","nfcNamingCode":"VNFB","timeZoneOffset":"UTC-05:30","reportingEntityId":"cc305d54-75b4-431b-adb2-eb6b9e541234","eventType":"platform","priority":"Normal","version":"4.0.2","reportingEntityName":"ibcx0001vm002oam001","sequence":1000,"domain":"heartbeat","lastEpochMicrosec":1544608845841,"eventName":"Heartbeat_vFW","vesEventListenerVersion":"7.0.2","sourceName":"SOURCE_NAME2","nfNamingCode":"VNFB"},"heartbeatFields":{"heartbeatInterval":20,"heartbeatFieldsVersion":"3.0"}}}] mock_resp = Mock() mock_resp.configure_mock( **{ "text": json.dumps(dmaap_data) } ) mock3.return_value = [("Heartbeat_vDNS", "Heartbeat_vFW")] mock1.return_value = mock_resp filename = "test-config.json" htbtworker.process_msg(filename, number_of_iterations=1) self.assertEqual(status, True) def test_parse_event(self): """ test_parse_event() opens the file test1.json and returns attributes """ filename = (os.path.dirname(__file__))+"/test1.json" with open(filename,"r") as fp: data = fp.read() srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data) self.assertEqual(srcname, "SOURCE_NAME1") self.assertEqual(event_name, "Heartbeat_vDNS") filename = (os.path.dirname(__file__))+"/test4.json" with open(filename,"r") as fp: data = fp.read() srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data) self.assertEqual(srcname, "zalp1bmdns01cmd010") self.assertEqual(event_name, "Heartbeat_vDNS") @patch('htbtworker.sql_executor') def test_create_and_check_vnf2_table (self, mock_settings): """ Test to verify existence of given table """ mock_cursor = Mock() mock_cursor.configure_mock( **{ "fetchone.return_value": [("vnf_table_2")] } ) mock_settings.return_value = mock_cursor status = htbtworker.check_and_create_vnf2_table () self.assertEqual(status, True) with patch('htbtworker.sql_executor', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): status = htbtworker.check_and_create_vnf2_table () self.assertEqual(False, status) @patch('htbtworker.sql_executor') def test_new_vnf_entry (self, sql_mock): """ Check to verify if new node entry is made for tracking HB """ status = True htbtworker.new_vnf_entry ("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1) self.assertEqual(status, True) @patch('htbtworker.sql_executor') def test_get_eventnamelist (self, sql_mock): """ Test to verify eventname list is returned from vnf_table_1 TBD - List comparison """ eventname_list = [("Heartbeat_vDNS", "Heartbeat_vFW")] mock_cursor = Mock() mock_cursor.configure_mock( **{ "fetchall.return_value": eventname_list } ) sql_mock.return_value = mock_cursor return_list = htbtworker.get_eventnamelist () self.assertIn("Heartbeat_vDNS", return_list) @patch('htbtworker.postgres_db_open') def test_sql_executor (self, db_mock): """ Test sql executor wrapper method """ htbtworker.sql_executor ("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2") htbtworker.sql_executor ("INSERT into information_schema.tables,") connection_db = db_mock with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(False, flag) @patch('psycopg2.connect') def test_postgres_db_open (self, mock): """ Test wrapper for postgres db connection """ conn = htbtworker.postgres_db_open() self.assertIsNotNone(conn) @patch('misshtbtd.read_hb_common') def test_check_process_reconfiguration (self, mock): """ Test if DB is in reconfiguration state """ mock.return_value = ("1234","RUNNING", "XYZ", 1234) flag = htbtworker.check_process_reconfiguration("test", "test","x.x.x.x", "1234", "test_db") self.assertEqual(False, flag) @patch('htbtworker.postgres_db_open') def test_commit_and_close_db (self, db_mock): """ Test commit and close db """ connection_db = db_mock flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(True, flag) with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(False, flag) if __name__ == "__main__": # pragma: no cover unittest.main()