aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_htbtworker.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_htbtworker.py')
-rw-r--r--tests/test_htbtworker.py167
1 files changed, 89 insertions, 78 deletions
diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py
index ee03ddb..4066a9f 100644
--- a/tests/test_htbtworker.py
+++ b/tests/test_htbtworker.py
@@ -18,148 +18,159 @@
import htbtworker
import os
import tempfile
-import json
+import json
import unittest
from unittest.mock import *
from _pytest.outcomes import skip
-class Test_htbtworker(unittest.TestCase):
+class Test_htbtworker(unittest.TestCase):
def setUp(self):
- htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json"
+ htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
- @patch('requests.get')
- @patch('htbtworker.check_process_reconfiguration', return_value=False)
- @patch('htbtworker.get_eventnamelist')
- @patch('htbtworker.sql_executor')
- def test_process_msg(self, mock1, mock2, mock3, sqlmock1):
+ @patch("requests.get")
+ @patch("htbtworker.check_process_reconfiguration", return_value=False)
+ @patch("htbtworker.get_eventnamelist")
+ @patch("htbtworker.sql_executor")
+ def test_process_msg(self, mock1, mock2, mock3, sqlmock1):
"""
Test to verify event processing using mock
TBD - Negative test
"""
-
+
status = True
- dmaap_data = [{"event":{"commonEventHeader":{"startEpochMicrosec":1544608845841,"sourceId":"VNFB_SRC5","eventId":"mvfs10","nfcNamingCode":"VNFB","timeZoneOffset":"UTC-05:30","reportingEntityId":"cc305d54-75b4-431b-adb2-eb6b9e541234","eventType":"platform","priority":"Normal","version":"4.0.2","reportingEntityName":"ibcx0001vm002oam001","sequence":1000,"domain":"heartbeat","lastEpochMicrosec":1544608845841,"eventName":"Heartbeat_vFW","vesEventListenerVersion":"7.0.2","sourceName":"SOURCE_NAME2","nfNamingCode":"VNFB"},"heartbeatFields":{"heartbeatInterval":20,"heartbeatFieldsVersion":"3.0"}}}]
-
- mock_resp = Mock()
- mock_resp.configure_mock(
- **{
- "text": json.dumps(dmaap_data)
- }
- )
+ dmaap_data = [
+ {
+ "event": {
+ "commonEventHeader": {
+ "startEpochMicrosec": 1544608845841,
+ "sourceId": "VNFB_SRC5",
+ "eventId": "mvfs10",
+ "nfcNamingCode": "VNFB",
+ "timeZoneOffset": "UTC-05:30",
+ "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234",
+ "eventType": "platform",
+ "priority": "Normal",
+ "version": "4.0.2",
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "sequence": 1000,
+ "domain": "heartbeat",
+ "lastEpochMicrosec": 1544608845841,
+ "eventName": "Heartbeat_vFW",
+ "vesEventListenerVersion": "7.0.2",
+ "sourceName": "SOURCE_NAME2",
+ "nfNamingCode": "VNFB",
+ },
+ "heartbeatFields": {"heartbeatInterval": 20, "heartbeatFieldsVersion": "3.0"},
+ }
+ }
+ ]
+
+ mock_resp = Mock()
+ mock_resp.configure_mock(**{"text": json.dumps(dmaap_data)})
mock3.return_value = [("Heartbeat_vDNS", "Heartbeat_vFW")]
- mock1.return_value = mock_resp
+ mock1.return_value = mock_resp
filename = "test-config.json"
htbtworker.process_msg(filename, number_of_iterations=1)
self.assertEqual(status, True)
-
def test_parse_event(self):
"""
test_parse_event() opens the file test1.json and returns attributes
"""
- filename = (os.path.dirname(__file__))+"/test1.json"
- with open(filename,"r") as fp:
- data = fp.read()
- srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data)
+ filename = (os.path.dirname(__file__)) + "/test1.json"
+ with open(filename, "r") as fp:
+ data = fp.read()
+ srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data)
self.assertEqual(srcname, "SOURCE_NAME1")
self.assertEqual(event_name, "Heartbeat_vDNS")
- filename = (os.path.dirname(__file__))+"/test4.json"
- with open(filename,"r") as fp:
- data = fp.read()
- srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data)
+ filename = (os.path.dirname(__file__)) + "/test4.json"
+ with open(filename, "r") as fp:
+ data = fp.read()
+ srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data)
self.assertEqual(srcname, "zalp1bmdns01cmd010")
self.assertEqual(event_name, "Heartbeat_vDNS")
-
- @patch('htbtworker.sql_executor')
- def test_create_and_check_vnf2_table (self, mock_settings):
+ @patch("htbtworker.sql_executor")
+ def test_create_and_check_vnf2_table(self, mock_settings):
"""
Test to verify existence of given table
"""
- mock_cursor = Mock()
- mock_cursor.configure_mock(
- **{
- "fetchone.return_value": [("vnf_table_2")]
- }
- )
+ mock_cursor = Mock()
+ mock_cursor.configure_mock(**{"fetchone.return_value": [("vnf_table_2")]})
mock_settings.return_value = mock_cursor
- status = htbtworker.check_and_create_vnf2_table ()
- self.assertEqual(status, True)
-
- with patch('htbtworker.sql_executor', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
- status = htbtworker.check_and_create_vnf2_table ()
- self.assertEqual(False, status)
-
- @patch('htbtworker.sql_executor')
- def test_new_vnf_entry (self, sql_mock):
+ status = htbtworker.check_and_create_vnf2_table()
+ self.assertEqual(status, True)
+
+ with patch("htbtworker.sql_executor", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
+ status = htbtworker.check_and_create_vnf2_table()
+ self.assertEqual(False, status)
+
+ @patch("htbtworker.sql_executor")
+ def test_new_vnf_entry(self, sql_mock):
"""
Check to verify if new node entry is made for tracking HB
"""
status = True
- htbtworker.new_vnf_entry ("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1)
+ htbtworker.new_vnf_entry("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1)
self.assertEqual(status, True)
- @patch('htbtworker.sql_executor')
- def test_get_eventnamelist (self, sql_mock):
+ @patch("htbtworker.sql_executor")
+ def test_get_eventnamelist(self, sql_mock):
"""
- Test to verify eventname list is returned from vnf_table_1
- TBD - List comparison
+ Test to verify eventname list is returned from vnf_table_1
+ TBD - List comparison
"""
eventname_list = [("Heartbeat_vDNS", "Heartbeat_vFW")]
- mock_cursor = Mock()
- mock_cursor.configure_mock(
- **{
- "fetchall.return_value": eventname_list
- }
- )
+ mock_cursor = Mock()
+ mock_cursor.configure_mock(**{"fetchall.return_value": eventname_list})
sql_mock.return_value = mock_cursor
- return_list = htbtworker.get_eventnamelist ()
- self.assertIn("Heartbeat_vDNS", return_list)
-
- @patch('htbtworker.postgres_db_open')
- def test_sql_executor (self, db_mock):
+ return_list = htbtworker.get_eventnamelist()
+ self.assertIn("Heartbeat_vDNS", return_list)
+
+ @patch("htbtworker.postgres_db_open")
+ def test_sql_executor(self, db_mock):
"""
- Test sql executor wrapper method
+ Test sql executor wrapper method
"""
- htbtworker.sql_executor ("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2")
- htbtworker.sql_executor ("INSERT into information_schema.tables,")
+ htbtworker.sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2")
+ htbtworker.sql_executor("INSERT into information_schema.tables,")
connection_db = db_mock
- with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
+ with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
flag = htbtworker.commit_and_close_db(connection_db)
self.assertEqual(False, flag)
- @patch('psycopg2.connect')
- def test_postgres_db_open (self, mock):
+ @patch("psycopg2.connect")
+ def test_postgres_db_open(self, mock):
"""
Test wrapper for postgres db connection
"""
conn = htbtworker.postgres_db_open()
self.assertIsNotNone(conn)
- @patch('misshtbtd.read_hb_common')
- def test_check_process_reconfiguration (self, mock):
+ @patch("misshtbtd.read_hb_common")
+ def test_check_process_reconfiguration(self, mock):
"""
Test if DB is in reconfiguration state
"""
- mock.return_value = ("1234","RUNNING", "XYZ", 1234)
- flag = htbtworker.check_process_reconfiguration("test", "test","x.x.x.x", "1234", "test_db")
+ mock.return_value = ("1234", "RUNNING", "XYZ", 1234)
+ flag = htbtworker.check_process_reconfiguration("test", "test", "x.x.x.x", "1234", "test_db")
self.assertEqual(False, flag)
-
- @patch('htbtworker.postgres_db_open')
- def test_commit_and_close_db (self, db_mock):
+
+ @patch("htbtworker.postgres_db_open")
+ def test_commit_and_close_db(self, db_mock):
"""
Test commit and close db
"""
connection_db = db_mock
flag = htbtworker.commit_and_close_db(connection_db)
self.assertEqual(True, flag)
- with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
+ with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
flag = htbtworker.commit_and_close_db(connection_db)
self.assertEqual(False, flag)
-
-
-if __name__ == "__main__": # pragma: no cover
+
+
+if __name__ == "__main__": # pragma: no cover
unittest.main()