diff options
-rw-r--r-- | Changelog.md | 1 | ||||
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 17 | ||||
-rw-r--r-- | miss_htbt_service/htbtworker.py | 138 | ||||
-rw-r--r-- | miss_htbt_service/misshtbtd.py | 5 | ||||
-rw-r--r-- | tests/test_db_monitoring.py | 60 | ||||
-rw-r--r-- | tests/test_htbt_get_cbs_config.py | 9 | ||||
-rw-r--r-- | tests/test_htbt_vnf_table.py | 36 | ||||
-rw-r--r-- | tests/test_htbtworker.py | 167 | ||||
-rw-r--r-- | tests/test_misshtbtd.py | 232 |
9 files changed, 363 insertions, 302 deletions
diff --git a/Changelog.md b/Changelog.md index bd2b18f..014d076 100644 --- a/Changelog.md +++ b/Changelog.md @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [2.6.0] - 2022/11/17 - [DCAEGEN2-2953] Code refactoring +- [DCAEGEN2-3321] Fix black reported formatting issues ## [2.5.0] - 2022/10/25 - [DCAEGEN2-2952] Handle exception when MR is unavailable diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index cbd8f24..0d27a95 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -176,7 +176,9 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) connection_db = pm.postgres_db_open() cur = connection_db.cursor() - if int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING": # pragma: no cover + if ( + int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING" + ): # pragma: no cover _logger.info("DBM: Active DB Monitoring Instance") cur.execute("SELECT event_name FROM vnf_table_1") vnf_list = [item[0] for item in cur.fetchall()] @@ -221,7 +223,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ epoc_time_sec = row[0][0] src_name = row[0][1] cl_flag = row[0][2] - if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover + if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover sendControlLoopEvent( "ONSET", pol_url, @@ -241,7 +243,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ (cl_flag, event_name, (source_name_key + 1)), ) connection_db.commit() - elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover + elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover sendControlLoopEvent( "ABATED", pol_url, @@ -274,7 +276,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ """ else: # pragma: no cover msg = "DBM:Inactive instance or hb_common state is not RUNNING" - _logger.info(msg) + _logger.info(msg) try: connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() @@ -282,11 +284,12 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ except psycopg2.DatabaseError as e: msg = "COMMON:Error %s" % e _logger.error(msg) - return False + return False cur.close() break -def db_monitoring_wrapper (current_pid, jsfile, number_of_iterations=-1): + +def db_monitoring_wrapper(current_pid, jsfile, number_of_iterations=-1): get_logger.configure_logger("db_monitoring") _logger.info("DBM: DBM Process started") ( @@ -308,4 +311,4 @@ def db_monitoring_wrapper (current_pid, jsfile, number_of_iterations=-1): if __name__ == "__main__": current_pid = sys.argv[1] jsfile = sys.argv[2] - db_monitoring_wrapper (current_pid,jsfile) + db_monitoring_wrapper(current_pid, jsfile) diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index 3328973..22155a3 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -35,13 +35,14 @@ _logger = logging.getLogger(__name__) configjsonfile = "../etc/config.json" + def process_msg(cfgjsonfile, number_of_iterations=-1): """ - Function to poll events from MR continuously - and determine if matching configuration to be - tracked for missed HB function + Function to poll events from MR continuously + and determine if matching configuration to be + tracked for missed HB function """ - + global mr_url global configjsonfile configjsonfile = cfgjsonfile @@ -50,21 +51,21 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): while number_of_iterations != 0: number_of_iterations -= 1 time.sleep(sleep_duration) - print ("*** CFG json file info " + configjsonfile) + print("*** CFG json file info " + configjsonfile) with open(configjsonfile, "r") as outfile: cfg = json.load(outfile) - - print ("*** CFG info " + str(cfg)) + + print("*** CFG info " + str(cfg)) mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"]) - + username = str(cfg["pg_userName"]) password = str(cfg["pg_passwd"]) db_host = str(cfg["pg_ipAddress"]) db_port = cfg["pg_portNum"] database_name = str(cfg["pg_dbName"]) - - reconfig_Flag = check_process_reconfiguration (username, password, db_host, db_port, database_name) - eventname_list = get_eventnamelist () + + reconfig_Flag = check_process_reconfiguration(username, password, db_host, db_port, database_name) + eventname_list = get_eventnamelist() if "groupID" not in os.environ or "consumerID" not in os.environ: get_url = mr_url + "/DefaultGroup/1?timeout=15000" @@ -86,7 +87,7 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): if "mrstatus" in input_string: continue jlist = input_string.split("\n") - print (jlist) + print(jlist) # Process the DMaaP input message retrieved error = False jobj = [] @@ -102,15 +103,15 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): continue if len(jobj) == 0: continue - + _logger.info("HBT jobj Array : %s", jobj) for item in jobj: srcname, lastepo, seqnum, event_name = parse_event(item) msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname _logger.info(msg) - - check_and_create_vnf2_table () - + + check_and_create_vnf2_table() + if event_name in eventname_list: # pragma: no cover cur = sql_executor("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", event_name) row = cur.fetchone() @@ -120,13 +121,14 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): if source_name_count == 0: # pragma: no cover _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag) - else: # pragma: no cover + else: # pragma: no cover msg = "HBT:event name, source_name & source_name_count are", event_name, srcname, source_name_count _logger.info(msg) for source_name_key in range(source_name_count): cur = sql_executor( "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s", - event_name, (source_name_key + 1) + event_name, + (source_name_key + 1), ) row = cur.fetchall() if len(row) == 0: @@ -138,7 +140,10 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): sql_executor( "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", - lastepo, srcname, event_name, (source_name_key + 1) + lastepo, + srcname, + event_name, + (source_name_key + 1), ) source_name_key = source_name_count break @@ -153,32 +158,30 @@ def process_msg(cfgjsonfile, number_of_iterations=-1): else: _logger.info("HBT:eventName is not being monitored, Ignoring JSON message") msg = "HBT: Looping to check for new events from DMAAP" - print ("HBT: Looping to check for new events from DMAAP") + print("HBT: Looping to check for new events from DMAAP") _logger.info(msg) -def new_vnf_entry (event_name, source_name_key, lastepo, srcname, cl_flag): - """ - Wrapper function to update event to tracking tables - """ - - sql_executor( - "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - event_name, source_name_key, lastepo, srcname, cl_flag - ) - sql_executor( - "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", - source_name_key, event_name - ) - -def parse_event (jsonstring): + +def new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag): + """ + Wrapper function to update event to tracking tables + """ + + sql_executor( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", event_name, source_name_key, lastepo, srcname, cl_flag + ) + sql_executor("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", source_name_key, event_name) + + +def parse_event(jsonstring): """ - Function to parse incoming event as json object + Function to parse incoming event as json object and parse out required attributes """ - _logger.info("HBT jsonstring: %s", jsonstring) - #convert string to object + _logger.info("HBT jsonstring: %s", jsonstring) + # convert string to object jitem = json.loads(jsonstring) - + try: srcname = jitem["event"]["commonEventHeader"]["sourceName"] lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] @@ -189,22 +192,26 @@ def parse_event (jsonstring): event_name = jitem["event"]["commonEventHeader"]["eventName"] msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname _logger.info(msg) - return (srcname,lastepo,seqnum,event_name) + return (srcname, lastepo, seqnum, event_name) except Exception as err: msg = "HBT message process error - ", err _logger.error(msg) -def get_eventnamelist (): + +def get_eventnamelist(): """ Function to fetch active monitored eventname list """ - cur = sql_executor("SELECT event_name FROM vnf_table_1",) + cur = sql_executor( + "SELECT event_name FROM vnf_table_1", + ) eventname_list = [item[0] for item in cur.fetchall()] msg = "\n\nHBT:eventnameList values ", eventname_list _logger.info(msg) return eventname_list -def check_and_create_vnf2_table (): + +def check_and_create_vnf2_table(): """ Check and create vnf_table_2 used for tracking HB entries """ @@ -215,7 +222,7 @@ def check_and_create_vnf2_table (): if database_names is not None: if "vnf_table_2" in database_names: table_exist = True - + if table_exist == False: msg = "HBT:Creating vnf_table_2" _logger.info(msg) @@ -228,8 +235,8 @@ def check_and_create_vnf2_table (): LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer - )""" - ,) + )""", + ) else: msg = "HBT:vnf_table_2 is already there" _logger.info(msg) @@ -238,15 +245,14 @@ def check_and_create_vnf2_table (): msg = "COMMON:Error %s" % e _logger.error(msg) return False - + + def check_process_reconfiguration(username, password, host, port, database_name): """ Verify if DB configuration in progress """ while True: - hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( - username, password, host, port, database_name - ) + hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(username, password, host, port, database_name) if hbc_state == "RECONFIGURATION": _logger.info("HBT: RECONFIGURATION in-progress. Waiting for hb_common state to become RUNNING") time.sleep(10) @@ -255,12 +261,13 @@ def check_process_reconfiguration(username, password, host, port, database_name) break return False + def postgres_db_open(): """ - Wrapper function for returning DB connection object + Wrapper function for returning DB connection object """ - - global configjsonfile + + global configjsonfile ( ip_address, @@ -275,22 +282,22 @@ def postgres_db_open(): return connection -def sql_executor (query, *values): +def sql_executor(query, *values): """ wrapper method for DB operation - """ - _logger.info("HBT query: %s values: %s", query, values) - + """ + _logger.info("HBT query: %s values: %s", query, values) + connection_db = postgres_db_open() cur = connection_db.cursor() - cur.execute(query, values) + cur.execute(query, values) update_commands = ["CREATE", "INSERT", "UPDATE"] - if any (x in query for x in update_commands): - try: - connection_db.commit() # <--- makes sure the change is shown in the database - except psycopg2.DatabaseError as e: - msg = "COMMON:Error %s" % e - _logger.error(msg) + if any(x in query for x in update_commands): + try: + connection_db.commit() # <--- makes sure the change is shown in the database + except psycopg2.DatabaseError as e: + msg = "COMMON:Error %s" % e + _logger.error(msg) return cur @@ -310,7 +317,6 @@ if __name__ == "__main__": configjsonfile = sys.argv[1] msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") - msg = "HBT:The config file name passed is -%s", configjsonfile + msg = "HBT:The config file name passed is -%s", configjsonfile _logger.info(msg) process_msg(configjsonfile) - diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py index 23d4835..1b96f38 100644 --- a/miss_htbt_service/misshtbtd.py +++ b/miss_htbt_service/misshtbtd.py @@ -126,6 +126,7 @@ def create_update_hb_common(update_flg, process_id, state, user_name, password, heartbeat.commit_and_close_db(connection_db) cur.close() + def db_table_creation_check(connection_db, table_name): cur = connection_db.cursor() try: @@ -141,7 +142,8 @@ def db_table_creation_check(connection_db, table_name): _logger.error(msg) finally: cur.close() - + + def create_update_vnf_table_1(jsfile, update_db, connection_db): with open(jsfile, "r") as outfile: cfg = json.load(outfile) @@ -375,7 +377,6 @@ def create_process(job_list, jsfile, pid_current): return job_list - def main(): get_logger.configure_logger("misshtbtd") pid_current = os.getpid() diff --git a/tests/test_db_monitoring.py b/tests/test_db_monitoring.py index b9a644e..5315a66 100644 --- a/tests/test_db_monitoring.py +++ b/tests/test_db_monitoring.py @@ -21,49 +21,60 @@ import logging import requests import tempfile import os -import json +import json import unittest from unittest.mock import * from _pytest.outcomes import skip _logger = logging.getLogger(__name__) + + class Test_db_monitoring(unittest.TestCase): - def setUp(self): - htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json" - - @patch('requests.post') + htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" + + @patch("requests.post") def test_sendControlLoopEvent(self, mock1): status = True - mock_resp = Mock() - mock_resp.configure_mock( - **{ - "status_code": 200 - } - ) - mock1.return_value = mock_resp - db_monitoring.sendControlLoopEvent("ONSET", "ABC","1.0","vFW","vFW","VNF","NODE","1234567890","VFW","1.0","DCAE") + mock_resp = Mock() + mock_resp.configure_mock(**{"status_code": 200}) + mock1.return_value = mock_resp + db_monitoring.sendControlLoopEvent( + "ONSET", "ABC", "1.0", "vFW", "vFW", "VNF", "NODE", "1234567890", "VFW", "1.0", "DCAE" + ) self.assertEqual(status, True) - db_monitoring.sendControlLoopEvent("ONSET", "ABC","1.0","vFW","vFW","VM","NODE","1234567890","VFW","1.0","DCAE") + db_monitoring.sendControlLoopEvent( + "ONSET", "ABC", "1.0", "vFW", "vFW", "VM", "NODE", "1234567890", "VFW", "1.0", "DCAE" + ) self.assertEqual(status, True) - db_monitoring.sendControlLoopEvent("ABATED", "ABC","1.0","vFW","vFW","VNF","NODE","1234567890","VFW","1.0","DCAE") + db_monitoring.sendControlLoopEvent( + "ABATED", "ABC", "1.0", "vFW", "vFW", "VNF", "NODE", "1234567890", "VFW", "1.0", "DCAE" + ) self.assertEqual(status, True) - db_monitoring.sendControlLoopEvent("ABATED", "ABC","1.0","vFW","vFW","VM","NODE","1234567890","VFW","1.0","DCAE") + db_monitoring.sendControlLoopEvent( + "ABATED", "ABC", "1.0", "vFW", "vFW", "VM", "NODE", "1234567890", "VFW", "1.0", "DCAE" + ) self.assertEqual(status, True) - @patch('misshtbtd.read_hb_common',return_value = ("1234","RUNNING", "XYZ", 1234)) - @patch('htbtworker.postgres_db_open') + @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ", 1234)) + @patch("htbtworker.postgres_db_open") def test_db_monitoring(self, mock1, mock2): status = True - mock_cursor = Mock() + mock_cursor = Mock() mock2.cursor.return_value = mock_cursor - db_monitoring.db_monitoring("111",htbtworker.configjsonfile ,"testuser","testpwd","10.0.0.0","1234","db_name") + db_monitoring.db_monitoring( + "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + ) self.assertEqual(status, True) - db_monitoring.db_monitoring("1234",htbtworker.configjsonfile ,"testuser","testpwd","10.0.0.0","1234","db_name") + db_monitoring.db_monitoring( + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + ) self.assertEqual(status, True) - mock1.cursor.return_value = ("1234","RECONFIGURATION", "XYZ", 1234) - db_monitoring.db_monitoring("1234",htbtworker.configjsonfile ,"testuser","testpwd","10.0.0.0","1234","db_name") + mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ", 1234) + db_monitoring.db_monitoring( + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + ) self.assertEqual(status, True) def test_db_monitoring_wrapper(self): @@ -71,5 +82,6 @@ class Test_db_monitoring(unittest.TestCase): db_monitoring.db_monitoring_wrapper("111", htbtworker.configjsonfile, number_of_iterations=0) self.assertEqual(status, True) -if __name__ == "__main__": # pragma: no cover + +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_htbt_get_cbs_config.py b/tests/test_htbt_get_cbs_config.py index 99e347c..2a2b641 100644 --- a/tests/test_htbt_get_cbs_config.py +++ b/tests/test_htbt_get_cbs_config.py @@ -105,7 +105,7 @@ class test_get_cbs_config(unittest.TestCase): ) # create copy of snmphtbt.json for pytest - #pytest_json_config = "/tmp/opt/app/miss_htbt_service/etc/config.json" + # pytest_json_config = "/tmp/opt/app/miss_htbt_service/etc/config.json" pytest_json_config = "test-config.json" with open(pytest_json_config, "w") as outfile: outfile.write(pytest_json_data) @@ -130,19 +130,18 @@ class test_get_cbs_config(unittest.TestCase): print("result: %s" % result) self.assertEqual(result, True) - @patch('misshtbtd.create_update_hb_common') - @patch('misshtbtd.read_hb_common') + @patch("misshtbtd.create_update_hb_common") + @patch("misshtbtd.read_hb_common") def test_poll_cbs(self, mock1, mock2): """ TBD """ status = True current_time = round(time.time()) - mock1.return_value = ('1', 'RUNNING', 'AA', current_time) + mock1.return_value = ("1", "RUNNING", "AA", current_time) # configjsonfile = (os.path.dirname(__file__))+"/test-config.json" configjsonfile = "test-config.json" os.environ.update(CBS_HTBT_JSON=configjsonfile) os.environ["pytest"] = "test" cp.poll_cbs(1) self.assertEqual(status, True) - diff --git a/tests/test_htbt_vnf_table.py b/tests/test_htbt_vnf_table.py index 4b6c3f1..e425766 100644 --- a/tests/test_htbt_vnf_table.py +++ b/tests/test_htbt_vnf_table.py @@ -52,39 +52,39 @@ class test_vnf_tables(unittest.TestCase): global ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties() - @patch('htbtworker.postgres_db_open') - @patch('misshtbtd.db_table_creation_check', return_value=True) + @patch("htbtworker.postgres_db_open") + @patch("misshtbtd.db_table_creation_check", return_value=True) def test_validate_vnf_table_1(self, mock, mock1): result = verify_DB_creation_1(user_name, password, ip_address, port_num, db_name) self.assertEqual(result, True) - @patch('htbtworker.postgres_db_open') - @patch('misshtbtd.db_table_creation_check', return_value=True) + @patch("htbtworker.postgres_db_open") + @patch("misshtbtd.db_table_creation_check", return_value=True) def test_validate_vnf_table_2(self, mock, mock1): result = verify_DB_creation_2(user_name, password, ip_address, port_num, db_name) self.assertEqual(result, True) - @patch('htbtworker.postgres_db_open') - @patch('misshtbtd.db_table_creation_check', return_value=True) + @patch("htbtworker.postgres_db_open") + @patch("misshtbtd.db_table_creation_check", return_value=True) def test_validate_hb_common(self, mock, mock1): result = verify_DB_creation_hb_common() self.assertEqual(result, True) - - @patch('cbs_polling.poll_cbs') + + @patch("cbs_polling.poll_cbs") def test_cbspolling(self, mock): # Check if no exception thrown verify_cbspolling() - @patch('misshtbtd.fetch_json_file') + @patch("misshtbtd.fetch_json_file") def test_fetch_json_file(self, mock1): - configjsonfile = (os.path.dirname(__file__))+"/test-config.json" + configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" mock1.return_value = configjsonfile - + result = verify_fetch_json_file() _logger.info(result) self.assertEqual(result, True) - @patch('misshtbtd.main') + @patch("misshtbtd.main") def test_misshtbtdmain(self, mock): result = verify_misshtbtdmain() _logger.info(result) @@ -95,14 +95,14 @@ class test_vnf_tables(unittest.TestCase): _logger.info(result) self.assertEqual(result, True) - @patch('misshtbtd.fetch_json_file') - @patch('misshtbtd.read_hb_common') - @patch('db_monitoring.db_monitoring') + @patch("misshtbtd.fetch_json_file") + @patch("misshtbtd.read_hb_common") + @patch("db_monitoring.db_monitoring") def test_dbmonitoring(self, mock1, mock2, mock3): - configjsonfile = (os.path.dirname(__file__))+"/test-config.json" + configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" mock1.return_value = configjsonfile - mock2.return_value = ("1234","RUNNING", "XYZ", 1234) - + mock2.return_value = ("1234", "RUNNING", "XYZ", 1234) + result = verify_dbmonitoring() _logger.info(result) self.assertEqual(result, True) diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py index ee03ddb..4066a9f 100644 --- a/tests/test_htbtworker.py +++ b/tests/test_htbtworker.py @@ -18,148 +18,159 @@ import htbtworker import os import tempfile -import json +import json import unittest from unittest.mock import * from _pytest.outcomes import skip -class Test_htbtworker(unittest.TestCase): +class Test_htbtworker(unittest.TestCase): def setUp(self): - htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json" + htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" - @patch('requests.get') - @patch('htbtworker.check_process_reconfiguration', return_value=False) - @patch('htbtworker.get_eventnamelist') - @patch('htbtworker.sql_executor') - def test_process_msg(self, mock1, mock2, mock3, sqlmock1): + @patch("requests.get") + @patch("htbtworker.check_process_reconfiguration", return_value=False) + @patch("htbtworker.get_eventnamelist") + @patch("htbtworker.sql_executor") + def test_process_msg(self, mock1, mock2, mock3, sqlmock1): """ Test to verify event processing using mock TBD - Negative test """ - + status = True - dmaap_data = [{"event":{"commonEventHeader":{"startEpochMicrosec":1544608845841,"sourceId":"VNFB_SRC5","eventId":"mvfs10","nfcNamingCode":"VNFB","timeZoneOffset":"UTC-05:30","reportingEntityId":"cc305d54-75b4-431b-adb2-eb6b9e541234","eventType":"platform","priority":"Normal","version":"4.0.2","reportingEntityName":"ibcx0001vm002oam001","sequence":1000,"domain":"heartbeat","lastEpochMicrosec":1544608845841,"eventName":"Heartbeat_vFW","vesEventListenerVersion":"7.0.2","sourceName":"SOURCE_NAME2","nfNamingCode":"VNFB"},"heartbeatFields":{"heartbeatInterval":20,"heartbeatFieldsVersion":"3.0"}}}] - - mock_resp = Mock() - mock_resp.configure_mock( - **{ - "text": json.dumps(dmaap_data) - } - ) + dmaap_data = [ + { + "event": { + "commonEventHeader": { + "startEpochMicrosec": 1544608845841, + "sourceId": "VNFB_SRC5", + "eventId": "mvfs10", + "nfcNamingCode": "VNFB", + "timeZoneOffset": "UTC-05:30", + "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234", + "eventType": "platform", + "priority": "Normal", + "version": "4.0.2", + "reportingEntityName": "ibcx0001vm002oam001", + "sequence": 1000, + "domain": "heartbeat", + "lastEpochMicrosec": 1544608845841, + "eventName": "Heartbeat_vFW", + "vesEventListenerVersion": "7.0.2", + "sourceName": "SOURCE_NAME2", + "nfNamingCode": "VNFB", + }, + "heartbeatFields": {"heartbeatInterval": 20, "heartbeatFieldsVersion": "3.0"}, + } + } + ] + + mock_resp = Mock() + mock_resp.configure_mock(**{"text": json.dumps(dmaap_data)}) mock3.return_value = [("Heartbeat_vDNS", "Heartbeat_vFW")] - mock1.return_value = mock_resp + mock1.return_value = mock_resp filename = "test-config.json" htbtworker.process_msg(filename, number_of_iterations=1) self.assertEqual(status, True) - def test_parse_event(self): """ test_parse_event() opens the file test1.json and returns attributes """ - filename = (os.path.dirname(__file__))+"/test1.json" - with open(filename,"r") as fp: - data = fp.read() - srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data) + filename = (os.path.dirname(__file__)) + "/test1.json" + with open(filename, "r") as fp: + data = fp.read() + srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data) self.assertEqual(srcname, "SOURCE_NAME1") self.assertEqual(event_name, "Heartbeat_vDNS") - filename = (os.path.dirname(__file__))+"/test4.json" - with open(filename,"r") as fp: - data = fp.read() - srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data) + filename = (os.path.dirname(__file__)) + "/test4.json" + with open(filename, "r") as fp: + data = fp.read() + srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data) self.assertEqual(srcname, "zalp1bmdns01cmd010") self.assertEqual(event_name, "Heartbeat_vDNS") - - @patch('htbtworker.sql_executor') - def test_create_and_check_vnf2_table (self, mock_settings): + @patch("htbtworker.sql_executor") + def test_create_and_check_vnf2_table(self, mock_settings): """ Test to verify existence of given table """ - mock_cursor = Mock() - mock_cursor.configure_mock( - **{ - "fetchone.return_value": [("vnf_table_2")] - } - ) + mock_cursor = Mock() + mock_cursor.configure_mock(**{"fetchone.return_value": [("vnf_table_2")]}) mock_settings.return_value = mock_cursor - status = htbtworker.check_and_create_vnf2_table () - self.assertEqual(status, True) - - with patch('htbtworker.sql_executor', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): - status = htbtworker.check_and_create_vnf2_table () - self.assertEqual(False, status) - - @patch('htbtworker.sql_executor') - def test_new_vnf_entry (self, sql_mock): + status = htbtworker.check_and_create_vnf2_table() + self.assertEqual(status, True) + + with patch("htbtworker.sql_executor", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + status = htbtworker.check_and_create_vnf2_table() + self.assertEqual(False, status) + + @patch("htbtworker.sql_executor") + def test_new_vnf_entry(self, sql_mock): """ Check to verify if new node entry is made for tracking HB """ status = True - htbtworker.new_vnf_entry ("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1) + htbtworker.new_vnf_entry("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1) self.assertEqual(status, True) - @patch('htbtworker.sql_executor') - def test_get_eventnamelist (self, sql_mock): + @patch("htbtworker.sql_executor") + def test_get_eventnamelist(self, sql_mock): """ - Test to verify eventname list is returned from vnf_table_1 - TBD - List comparison + Test to verify eventname list is returned from vnf_table_1 + TBD - List comparison """ eventname_list = [("Heartbeat_vDNS", "Heartbeat_vFW")] - mock_cursor = Mock() - mock_cursor.configure_mock( - **{ - "fetchall.return_value": eventname_list - } - ) + mock_cursor = Mock() + mock_cursor.configure_mock(**{"fetchall.return_value": eventname_list}) sql_mock.return_value = mock_cursor - return_list = htbtworker.get_eventnamelist () - self.assertIn("Heartbeat_vDNS", return_list) - - @patch('htbtworker.postgres_db_open') - def test_sql_executor (self, db_mock): + return_list = htbtworker.get_eventnamelist() + self.assertIn("Heartbeat_vDNS", return_list) + + @patch("htbtworker.postgres_db_open") + def test_sql_executor(self, db_mock): """ - Test sql executor wrapper method + Test sql executor wrapper method """ - htbtworker.sql_executor ("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2") - htbtworker.sql_executor ("INSERT into information_schema.tables,") + htbtworker.sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2") + htbtworker.sql_executor("INSERT into information_schema.tables,") connection_db = db_mock - with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(False, flag) - @patch('psycopg2.connect') - def test_postgres_db_open (self, mock): + @patch("psycopg2.connect") + def test_postgres_db_open(self, mock): """ Test wrapper for postgres db connection """ conn = htbtworker.postgres_db_open() self.assertIsNotNone(conn) - @patch('misshtbtd.read_hb_common') - def test_check_process_reconfiguration (self, mock): + @patch("misshtbtd.read_hb_common") + def test_check_process_reconfiguration(self, mock): """ Test if DB is in reconfiguration state """ - mock.return_value = ("1234","RUNNING", "XYZ", 1234) - flag = htbtworker.check_process_reconfiguration("test", "test","x.x.x.x", "1234", "test_db") + mock.return_value = ("1234", "RUNNING", "XYZ", 1234) + flag = htbtworker.check_process_reconfiguration("test", "test", "x.x.x.x", "1234", "test_db") self.assertEqual(False, flag) - - @patch('htbtworker.postgres_db_open') - def test_commit_and_close_db (self, db_mock): + + @patch("htbtworker.postgres_db_open") + def test_commit_and_close_db(self, db_mock): """ Test commit and close db """ connection_db = db_mock flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(True, flag) - with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): + with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())): flag = htbtworker.commit_and_close_db(connection_db) self.assertEqual(False, flag) - - -if __name__ == "__main__": # pragma: no cover + + +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/tests/test_misshtbtd.py b/tests/test_misshtbtd.py index 0fc8a24..0374a6b 100644 --- a/tests/test_misshtbtd.py +++ b/tests/test_misshtbtd.py @@ -20,196 +20,224 @@ import time import psycopg2 import os import tempfile -import json +import json import unittest from unittest.mock import * from _pytest.outcomes import skip from pickle import FALSE -class Test_misshtbtd(unittest.TestCase): +class Test_misshtbtd(unittest.TestCase): def setUp(self): - htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json" + htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" - @patch ('psycopg2.connect') + @patch("psycopg2.connect") def test_create_database(self, mock1): status = True - mock_cursor = MagicMock() - mock_cursor.configure_mock( - **{ - "fetchone.return_value": [("1")] - } + mock_cursor = MagicMock() + mock_cursor.configure_mock(**{"fetchone.return_value": [("1")]}) + mock1.return_value = mock_cursor + misshtbtd.create_database( + "vnf_table_1", htbtworker.configjsonfile, "10.0.0.0", "1234", "testuser", "testpwd", "heartbeatdb" ) - mock1.return_value = mock_cursor - misshtbtd.create_database("vnf_table_1",htbtworker.configjsonfile, "10.0.0.0","1234","testuser","testpwd","heartbeatdb") self.assertEqual(status, True) - - @patch ('htbtworker.postgres_db_open') + + @patch("htbtworker.postgres_db_open") def test_read_hb_common(self, mock1): - - mock_cursor = Mock() + + mock_cursor = Mock() mock_cursor.configure_mock( - **{ - "cursor.return_value.fetchall.return_value": [["1", "AA", "123456789", "RUNNING"]] - } + **{"cursor.return_value.fetchall.return_value": [["1", "AA", "123456789", "RUNNING"]]} ) mock1.return_value = mock_cursor - self.assertEqual(('1', 'RUNNING', 'AA', '123456789'), misshtbtd.read_hb_common("testuser","testpwd","10.0.0.0","1234","heartbeatdb")) + self.assertEqual( + ("1", "RUNNING", "AA", "123456789"), + misshtbtd.read_hb_common("testuser", "testpwd", "10.0.0.0", "1234", "heartbeatdb"), + ) - @patch ('misshtbtd.db_table_creation_check') - @patch ('htbtworker.postgres_db_open') + @patch("misshtbtd.db_table_creation_check") + @patch("htbtworker.postgres_db_open") def test_create_update_hb_common(self, mock1, mock2): - ''' + """ TODO: argument ordering TBD - ''' - mock_cursor = Mock() - mock1.return_value = mock_cursor + """ + mock_cursor = Mock() + mock1.return_value = mock_cursor mock2.return_value = True status = True - misshtbtd.create_update_hb_common (0,111, "RUNNING", "testuser","testpwd","10.0.0.0","1234","testdb") + misshtbtd.create_update_hb_common(0, 111, "RUNNING", "testuser", "testpwd", "10.0.0.0", "1234", "testdb") self.assertEqual(status, True) mock2.return_value = False - misshtbtd.create_update_hb_common (1,111, "RUNNING", "testuser","testpwd","10.0.0.0","1234","testdb") + misshtbtd.create_update_hb_common(1, 111, "RUNNING", "testuser", "testpwd", "10.0.0.0", "1234", "testdb") self.assertEqual(status, True) - def test_db_table_creation_check (self): + def test_db_table_creation_check(self): """ Test to verify existence of given table """ - mock_cursor = Mock() - mock_cursor.configure_mock( - **{ - "cursor.return_value.fetchone.return_value": ("vnf_table_2") - } - ) - status = misshtbtd.db_table_creation_check (mock_cursor,"vnf_table_2") - self.assertEqual(status, True) + mock_cursor = Mock() + mock_cursor.configure_mock(**{"cursor.return_value.fetchone.return_value": ("vnf_table_2")}) + status = misshtbtd.db_table_creation_check(mock_cursor, "vnf_table_2") + self.assertEqual(status, True) - @patch ('misshtbtd.db_table_creation_check') - def test_create_update_vnf_table_1 (self, mock1): + @patch("misshtbtd.db_table_creation_check") + def test_create_update_vnf_table_1(self, mock1): """ TBD """ status = True - mock_cursor = Mock() + mock_cursor = Mock() mock_cursor.configure_mock( - **{ - "cursor.return_value.fetchall.return_value": [["Heartbeat_vDNS"],["Heartbeat_vFW"]] - } + **{"cursor.return_value.fetchall.return_value": [["Heartbeat_vDNS"], ["Heartbeat_vFW"]]} ) - misshtbtd.create_update_vnf_table_1 (htbtworker.configjsonfile,1, mock_cursor) - self.assertEqual(status, True) - misshtbtd.create_update_vnf_table_1 (htbtworker.configjsonfile,0, mock_cursor) - self.assertEqual(status, True) + misshtbtd.create_update_vnf_table_1(htbtworker.configjsonfile, 1, mock_cursor) + self.assertEqual(status, True) + misshtbtd.create_update_vnf_table_1(htbtworker.configjsonfile, 0, mock_cursor) + self.assertEqual(status, True) - def test_read_hb_properties_default_from_file (self): + def test_read_hb_properties_default_from_file(self): """ TBD """ - return_val = ("10.0.0.0","1234", "postgres-test", "postgres-test", "postgres", "True", "300") + return_val = ("10.0.0.0", "1234", "postgres-test", "postgres-test", "postgres", "True", "300") misshtbtd.hb_properties_file = (os.path.dirname(__file__)) + "/hbproperties-test.yaml" - (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) = misshtbtd.read_hb_properties_default() - self.assertEqual (return_val,(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) ) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = misshtbtd.read_hb_properties_default() + self.assertEqual( + return_val, (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) + ) - @patch.dict(os.environ, {"pg_ipAddress": "10.0.0.10", "pg_portNum":"1234", "pg_userName": "test","pg_passwd":"test"}) - def test_read_hb_properties_default_from_env (self): + @patch.dict( + os.environ, {"pg_ipAddress": "10.0.0.10", "pg_portNum": "1234", "pg_userName": "test", "pg_passwd": "test"} + ) + def test_read_hb_properties_default_from_env(self): """ TBD """ - return_val = ("10.0.0.10","1234", "test", "test", "postgres", "True", "300") + return_val = ("10.0.0.10", "1234", "test", "test", "postgres", "True", "300") misshtbtd.hb_properties_file = (os.path.dirname(__file__)) + "/hbproperties-test.yaml" - (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) = misshtbtd.read_hb_properties_default() - self.assertEqual (return_val,(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) ) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = misshtbtd.read_hb_properties_default() + self.assertEqual( + return_val, (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) + ) - def test_read_hb_properties_from_file (self): + def test_read_hb_properties_from_file(self): """ TBD """ - htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json" + htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" misshtbtd.hb_properties_file = (os.path.dirname(__file__)) + "/hbproperties-test.yaml" - - return_val = ("10.0.4.1","5432", "postgres", "postgres", "postgres", "True", "300") - (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) = misshtbtd.read_hb_properties(htbtworker.configjsonfile) - self.assertEqual (return_val,(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) ) - @patch.dict(os.environ, {"pg_ipAddress": "10.0.0.10", "pg_portNum":"1234", "pg_userName": "test","pg_passwd":"test"}) - def test_read_hb_properties_exception_handling (self): + return_val = ("10.0.4.1", "5432", "postgres", "postgres", "postgres", "True", "300") + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = misshtbtd.read_hb_properties(htbtworker.configjsonfile) + self.assertEqual( + return_val, (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) + ) + + @patch.dict( + os.environ, {"pg_ipAddress": "10.0.0.10", "pg_portNum": "1234", "pg_userName": "test", "pg_passwd": "test"} + ) + def test_read_hb_properties_exception_handling(self): """ TBD """ - htbtworker.configjsonfile = (os.path.dirname(__file__))+"/aa-config.json" + htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/aa-config.json" misshtbtd.hb_properties_file = (os.path.dirname(__file__)) + "/hbproperties-test.yaml" - - return_val = ("10.0.0.10","1234", "test", "test", "postgres", "True", "300") - (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) = misshtbtd.read_hb_properties(htbtworker.configjsonfile) - self.assertEqual (return_val,(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) ) + return_val = ("10.0.0.10", "1234", "test", "test", "postgres", "True", "300") + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = misshtbtd.read_hb_properties(htbtworker.configjsonfile) + self.assertEqual( + return_val, (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) + ) - @patch ('mod.htbt_get_cbs_config') - def test_fetch_json_file (self, mock1): + @patch("mod.htbt_get_cbs_config") + def test_fetch_json_file(self, mock1): """ TBD """ - #mock.return_value.is_file.return_value = True + # mock.return_value.is_file.return_value = True mock1.return_value = True misshtbtd.CONFIG_PATH = "test-config.json" filename = misshtbtd.fetch_json_file() - self.assertEqual (misshtbtd.CONFIG_PATH, filename) + self.assertEqual(misshtbtd.CONFIG_PATH, filename) # mock1.return_value = False # filename = misshtbtd.fetch_json_file() # self.assertEqual ("./etc/config.json", filename) - @patch ('htbtworker.postgres_db_open') - @patch ('misshtbtd.db_table_creation_check') - @patch ('misshtbtd.create_update_vnf_table_1') + @patch("htbtworker.postgres_db_open") + @patch("misshtbtd.db_table_creation_check") + @patch("misshtbtd.create_update_vnf_table_1") def test_create_update_db(self, mock1, mock2, mock3): - ''' + """ TODO: argument ordering TBD - ''' - mock_cursor = Mock() - mock3.return_value = mock_cursor + """ + mock_cursor = Mock() + mock3.return_value = mock_cursor mock1.return_value = True mock2.return_value = True status = True - misshtbtd.create_update_db (0,htbtworker.configjsonfile, "10.0.0.0", "1234", "test","test", "testdb") + misshtbtd.create_update_db(0, htbtworker.configjsonfile, "10.0.0.0", "1234", "test", "test", "testdb") self.assertEqual(status, True) mock1.return_value = False - misshtbtd.create_update_db (0,htbtworker.configjsonfile, "10.0.0.0", "1234", "test","test", "testdb") + misshtbtd.create_update_db(0, htbtworker.configjsonfile, "10.0.0.0", "1234", "test", "test", "testdb") self.assertEqual(status, True) - @patch ('multiprocessing.Process') - @patch ('misshtbtd.create_update_db') - @patch('misshtbtd.create_update_hb_common') + @patch("multiprocessing.Process") + @patch("misshtbtd.create_update_db") + @patch("misshtbtd.create_update_hb_common") def test_create_process(self, mock1, mock2, mock3): job_list = [] mock_process = Mock() - mock_process.configure_mock( - **{ - "start.return_value": "1" - } - ) + mock_process.configure_mock(**{"start.return_value": "1"}) mock1.return_value = mock_process - job_list = misshtbtd.create_process ([],htbtworker.configjsonfile, 1) + job_list = misshtbtd.create_process([], htbtworker.configjsonfile, 1) self.assertTrue(job_list) - @patch ('multiprocessing.Process') - @patch ('misshtbtd.read_hb_common') - @patch ('misshtbtd.create_update_db') - @patch ('misshtbtd.create_update_hb_common') + @patch("multiprocessing.Process") + @patch("misshtbtd.read_hb_common") + @patch("misshtbtd.create_update_db") + @patch("misshtbtd.create_update_hb_common") def test_main(self, mock1, mock2, mock3, mock4): status = True mock_process = Mock() - mock_process.configure_mock( - **{ - "start.return_value": "1", - "pid.return_Value":"1111" - } - ) + mock_process.configure_mock(**{"start.return_value": "1", "pid.return_Value": "1111"}) mock1.return_value = mock_process current_time = round(time.time()) - mock2.return_value = ('1', 'RUNNING', 'AA', current_time) + mock2.return_value = ("1", "RUNNING", "AA", current_time) misshtbtd.main() self.assertEqual(status, True) -if __name__ == "__main__": # pragma: no cover + +if __name__ == "__main__": # pragma: no cover unittest.main() |