aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Changelog.md1
-rw-r--r--miss_htbt_service/db_monitoring.py17
-rw-r--r--miss_htbt_service/htbtworker.py138
-rw-r--r--miss_htbt_service/misshtbtd.py5
-rw-r--r--tests/test_db_monitoring.py60
-rw-r--r--tests/test_htbt_get_cbs_config.py9
-rw-r--r--tests/test_htbt_vnf_table.py36
-rw-r--r--tests/test_htbtworker.py167
-rw-r--r--tests/test_misshtbtd.py232
9 files changed, 363 insertions, 302 deletions
diff --git a/Changelog.md b/Changelog.md
index bd2b18f..014d076 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [2.6.0] - 2022/11/17
- [DCAEGEN2-2953] Code refactoring
+- [DCAEGEN2-3321] Fix black reported formatting issues
## [2.5.0] - 2022/10/25
- [DCAEGEN2-2952] Handle exception when MR is unavailable
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
index cbd8f24..0d27a95 100644
--- a/miss_htbt_service/db_monitoring.py
+++ b/miss_htbt_service/db_monitoring.py
@@ -176,7 +176,9 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
connection_db = pm.postgres_db_open()
cur = connection_db.cursor()
- if int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING": # pragma: no cover
+ if (
+ int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING"
+ ): # pragma: no cover
_logger.info("DBM: Active DB Monitoring Instance")
cur.execute("SELECT event_name FROM vnf_table_1")
vnf_list = [item[0] for item in cur.fetchall()]
@@ -221,7 +223,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
epoc_time_sec = row[0][0]
src_name = row[0][1]
cl_flag = row[0][2]
- if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover
+ if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover
sendControlLoopEvent(
"ONSET",
pol_url,
@@ -241,7 +243,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
(cl_flag, event_name, (source_name_key + 1)),
)
connection_db.commit()
- elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover
+ elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover
sendControlLoopEvent(
"ABATED",
pol_url,
@@ -274,7 +276,7 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
"""
else: # pragma: no cover
msg = "DBM:Inactive instance or hb_common state is not RUNNING"
- _logger.info(msg)
+ _logger.info(msg)
try:
connection_db.commit() # <--- makes sure the change is shown in the database
connection_db.close()
@@ -282,11 +284,12 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
except psycopg2.DatabaseError as e:
msg = "COMMON:Error %s" % e
_logger.error(msg)
- return False
+ return False
cur.close()
break
-def db_monitoring_wrapper (current_pid, jsfile, number_of_iterations=-1):
+
+def db_monitoring_wrapper(current_pid, jsfile, number_of_iterations=-1):
get_logger.configure_logger("db_monitoring")
_logger.info("DBM: DBM Process started")
(
@@ -308,4 +311,4 @@ def db_monitoring_wrapper (current_pid, jsfile, number_of_iterations=-1):
if __name__ == "__main__":
current_pid = sys.argv[1]
jsfile = sys.argv[2]
- db_monitoring_wrapper (current_pid,jsfile)
+ db_monitoring_wrapper(current_pid, jsfile)
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index 3328973..22155a3 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -35,13 +35,14 @@ _logger = logging.getLogger(__name__)
configjsonfile = "../etc/config.json"
+
def process_msg(cfgjsonfile, number_of_iterations=-1):
"""
- Function to poll events from MR continuously
- and determine if matching configuration to be
- tracked for missed HB function
+ Function to poll events from MR continuously
+ and determine if matching configuration to be
+ tracked for missed HB function
"""
-
+
global mr_url
global configjsonfile
configjsonfile = cfgjsonfile
@@ -50,21 +51,21 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
while number_of_iterations != 0:
number_of_iterations -= 1
time.sleep(sleep_duration)
- print ("*** CFG json file info " + configjsonfile)
+ print("*** CFG json file info " + configjsonfile)
with open(configjsonfile, "r") as outfile:
cfg = json.load(outfile)
-
- print ("*** CFG info " + str(cfg))
+
+ print("*** CFG info " + str(cfg))
mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"])
-
+
username = str(cfg["pg_userName"])
password = str(cfg["pg_passwd"])
db_host = str(cfg["pg_ipAddress"])
db_port = cfg["pg_portNum"]
database_name = str(cfg["pg_dbName"])
-
- reconfig_Flag = check_process_reconfiguration (username, password, db_host, db_port, database_name)
- eventname_list = get_eventnamelist ()
+
+ reconfig_Flag = check_process_reconfiguration(username, password, db_host, db_port, database_name)
+ eventname_list = get_eventnamelist()
if "groupID" not in os.environ or "consumerID" not in os.environ:
get_url = mr_url + "/DefaultGroup/1?timeout=15000"
@@ -86,7 +87,7 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
if "mrstatus" in input_string:
continue
jlist = input_string.split("\n")
- print (jlist)
+ print(jlist)
# Process the DMaaP input message retrieved
error = False
jobj = []
@@ -102,15 +103,15 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
continue
if len(jobj) == 0:
continue
-
+
_logger.info("HBT jobj Array : %s", jobj)
for item in jobj:
srcname, lastepo, seqnum, event_name = parse_event(item)
msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname
_logger.info(msg)
-
- check_and_create_vnf2_table ()
-
+
+ check_and_create_vnf2_table()
+
if event_name in eventname_list: # pragma: no cover
cur = sql_executor("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", event_name)
row = cur.fetchone()
@@ -120,13 +121,14 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
if source_name_count == 0: # pragma: no cover
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag)
- else: # pragma: no cover
+ else: # pragma: no cover
msg = "HBT:event name, source_name & source_name_count are", event_name, srcname, source_name_count
_logger.info(msg)
for source_name_key in range(source_name_count):
cur = sql_executor(
"SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s",
- event_name, (source_name_key + 1)
+ event_name,
+ (source_name_key + 1),
)
row = cur.fetchall()
if len(row) == 0:
@@ -138,7 +140,10 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
sql_executor(
"UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
"WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
- lastepo, srcname, event_name, (source_name_key + 1)
+ lastepo,
+ srcname,
+ event_name,
+ (source_name_key + 1),
)
source_name_key = source_name_count
break
@@ -153,32 +158,30 @@ def process_msg(cfgjsonfile, number_of_iterations=-1):
else:
_logger.info("HBT:eventName is not being monitored, Ignoring JSON message")
msg = "HBT: Looping to check for new events from DMAAP"
- print ("HBT: Looping to check for new events from DMAAP")
+ print("HBT: Looping to check for new events from DMAAP")
_logger.info(msg)
-def new_vnf_entry (event_name, source_name_key, lastepo, srcname, cl_flag):
- """
- Wrapper function to update event to tracking tables
- """
-
- sql_executor(
- "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- event_name, source_name_key, lastepo, srcname, cl_flag
- )
- sql_executor(
- "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
- source_name_key, event_name
- )
-
-def parse_event (jsonstring):
+
+def new_vnf_entry(event_name, source_name_key, lastepo, srcname, cl_flag):
+ """
+ Wrapper function to update event to tracking tables
+ """
+
+ sql_executor(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", event_name, source_name_key, lastepo, srcname, cl_flag
+ )
+ sql_executor("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", source_name_key, event_name)
+
+
+def parse_event(jsonstring):
"""
- Function to parse incoming event as json object
+ Function to parse incoming event as json object
and parse out required attributes
"""
- _logger.info("HBT jsonstring: %s", jsonstring)
- #convert string to object
+ _logger.info("HBT jsonstring: %s", jsonstring)
+ # convert string to object
jitem = json.loads(jsonstring)
-
+
try:
srcname = jitem["event"]["commonEventHeader"]["sourceName"]
lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
@@ -189,22 +192,26 @@ def parse_event (jsonstring):
event_name = jitem["event"]["commonEventHeader"]["eventName"]
msg = "HBT:Newly received HB event values ::", event_name, lastepo, srcname
_logger.info(msg)
- return (srcname,lastepo,seqnum,event_name)
+ return (srcname, lastepo, seqnum, event_name)
except Exception as err:
msg = "HBT message process error - ", err
_logger.error(msg)
-def get_eventnamelist ():
+
+def get_eventnamelist():
"""
Function to fetch active monitored eventname list
"""
- cur = sql_executor("SELECT event_name FROM vnf_table_1",)
+ cur = sql_executor(
+ "SELECT event_name FROM vnf_table_1",
+ )
eventname_list = [item[0] for item in cur.fetchall()]
msg = "\n\nHBT:eventnameList values ", eventname_list
_logger.info(msg)
return eventname_list
-def check_and_create_vnf2_table ():
+
+def check_and_create_vnf2_table():
"""
Check and create vnf_table_2 used for tracking HB entries
"""
@@ -215,7 +222,7 @@ def check_and_create_vnf2_table ():
if database_names is not None:
if "vnf_table_2" in database_names:
table_exist = True
-
+
if table_exist == False:
msg = "HBT:Creating vnf_table_2"
_logger.info(msg)
@@ -228,8 +235,8 @@ def check_and_create_vnf2_table ():
LAST_EPO_TIME BIGINT,
SOURCE_NAME varchar,
CL_FLAG integer
- )"""
- ,)
+ )""",
+ )
else:
msg = "HBT:vnf_table_2 is already there"
_logger.info(msg)
@@ -238,15 +245,14 @@ def check_and_create_vnf2_table ():
msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
-
+
+
def check_process_reconfiguration(username, password, host, port, database_name):
"""
Verify if DB configuration in progress
"""
while True:
- hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(
- username, password, host, port, database_name
- )
+ hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(username, password, host, port, database_name)
if hbc_state == "RECONFIGURATION":
_logger.info("HBT: RECONFIGURATION in-progress. Waiting for hb_common state to become RUNNING")
time.sleep(10)
@@ -255,12 +261,13 @@ def check_process_reconfiguration(username, password, host, port, database_name)
break
return False
+
def postgres_db_open():
"""
- Wrapper function for returning DB connection object
+ Wrapper function for returning DB connection object
"""
-
- global configjsonfile
+
+ global configjsonfile
(
ip_address,
@@ -275,22 +282,22 @@ def postgres_db_open():
return connection
-def sql_executor (query, *values):
+def sql_executor(query, *values):
"""
wrapper method for DB operation
- """
- _logger.info("HBT query: %s values: %s", query, values)
-
+ """
+ _logger.info("HBT query: %s values: %s", query, values)
+
connection_db = postgres_db_open()
cur = connection_db.cursor()
- cur.execute(query, values)
+ cur.execute(query, values)
update_commands = ["CREATE", "INSERT", "UPDATE"]
- if any (x in query for x in update_commands):
- try:
- connection_db.commit() # <--- makes sure the change is shown in the database
- except psycopg2.DatabaseError as e:
- msg = "COMMON:Error %s" % e
- _logger.error(msg)
+ if any(x in query for x in update_commands):
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ except psycopg2.DatabaseError as e:
+ msg = "COMMON:Error %s" % e
+ _logger.error(msg)
return cur
@@ -310,7 +317,6 @@ if __name__ == "__main__":
configjsonfile = sys.argv[1]
msg = "HBT:HeartBeat thread Created"
_logger.info("HBT:HeartBeat thread Created")
- msg = "HBT:The config file name passed is -%s", configjsonfile
+ msg = "HBT:The config file name passed is -%s", configjsonfile
_logger.info(msg)
process_msg(configjsonfile)
-
diff --git a/miss_htbt_service/misshtbtd.py b/miss_htbt_service/misshtbtd.py
index 23d4835..1b96f38 100644
--- a/miss_htbt_service/misshtbtd.py
+++ b/miss_htbt_service/misshtbtd.py
@@ -126,6 +126,7 @@ def create_update_hb_common(update_flg, process_id, state, user_name, password,
heartbeat.commit_and_close_db(connection_db)
cur.close()
+
def db_table_creation_check(connection_db, table_name):
cur = connection_db.cursor()
try:
@@ -141,7 +142,8 @@ def db_table_creation_check(connection_db, table_name):
_logger.error(msg)
finally:
cur.close()
-
+
+
def create_update_vnf_table_1(jsfile, update_db, connection_db):
with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
@@ -375,7 +377,6 @@ def create_process(job_list, jsfile, pid_current):
return job_list
-
def main():
get_logger.configure_logger("misshtbtd")
pid_current = os.getpid()
diff --git a/tests/test_db_monitoring.py b/tests/test_db_monitoring.py
index b9a644e..5315a66 100644
--- a/tests/test_db_monitoring.py
+++ b/tests/test_db_monitoring.py
@@ -21,49 +21,60 @@ import logging
import requests
import tempfile
import os
-import json
+import json
import unittest
from unittest.mock import *
from _pytest.outcomes import skip
_logger = logging.getLogger(__name__)
+
+
class Test_db_monitoring(unittest.TestCase):
-
def setUp(self):
- htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json"
-
- @patch('requests.post')
+ htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
+
+ @patch("requests.post")
def test_sendControlLoopEvent(self, mock1):
status = True
- mock_resp = Mock()
- mock_resp.configure_mock(
- **{
- "status_code": 200
- }
- )
- mock1.return_value = mock_resp
- db_monitoring.sendControlLoopEvent("ONSET", "ABC","1.0","vFW","vFW","VNF","NODE","1234567890","VFW","1.0","DCAE")
+ mock_resp = Mock()
+ mock_resp.configure_mock(**{"status_code": 200})
+ mock1.return_value = mock_resp
+ db_monitoring.sendControlLoopEvent(
+ "ONSET", "ABC", "1.0", "vFW", "vFW", "VNF", "NODE", "1234567890", "VFW", "1.0", "DCAE"
+ )
self.assertEqual(status, True)
- db_monitoring.sendControlLoopEvent("ONSET", "ABC","1.0","vFW","vFW","VM","NODE","1234567890","VFW","1.0","DCAE")
+ db_monitoring.sendControlLoopEvent(
+ "ONSET", "ABC", "1.0", "vFW", "vFW", "VM", "NODE", "1234567890", "VFW", "1.0", "DCAE"
+ )
self.assertEqual(status, True)
- db_monitoring.sendControlLoopEvent("ABATED", "ABC","1.0","vFW","vFW","VNF","NODE","1234567890","VFW","1.0","DCAE")
+ db_monitoring.sendControlLoopEvent(
+ "ABATED", "ABC", "1.0", "vFW", "vFW", "VNF", "NODE", "1234567890", "VFW", "1.0", "DCAE"
+ )
self.assertEqual(status, True)
- db_monitoring.sendControlLoopEvent("ABATED", "ABC","1.0","vFW","vFW","VM","NODE","1234567890","VFW","1.0","DCAE")
+ db_monitoring.sendControlLoopEvent(
+ "ABATED", "ABC", "1.0", "vFW", "vFW", "VM", "NODE", "1234567890", "VFW", "1.0", "DCAE"
+ )
self.assertEqual(status, True)
- @patch('misshtbtd.read_hb_common',return_value = ("1234","RUNNING", "XYZ", 1234))
- @patch('htbtworker.postgres_db_open')
+ @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ", 1234))
+ @patch("htbtworker.postgres_db_open")
def test_db_monitoring(self, mock1, mock2):
status = True
- mock_cursor = Mock()
+ mock_cursor = Mock()
mock2.cursor.return_value = mock_cursor
- db_monitoring.db_monitoring("111",htbtworker.configjsonfile ,"testuser","testpwd","10.0.0.0","1234","db_name")
+ db_monitoring.db_monitoring(
+ "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ )
self.assertEqual(status, True)
- db_monitoring.db_monitoring("1234",htbtworker.configjsonfile ,"testuser","testpwd","10.0.0.0","1234","db_name")
+ db_monitoring.db_monitoring(
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ )
self.assertEqual(status, True)
- mock1.cursor.return_value = ("1234","RECONFIGURATION", "XYZ", 1234)
- db_monitoring.db_monitoring("1234",htbtworker.configjsonfile ,"testuser","testpwd","10.0.0.0","1234","db_name")
+ mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ", 1234)
+ db_monitoring.db_monitoring(
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ )
self.assertEqual(status, True)
def test_db_monitoring_wrapper(self):
@@ -71,5 +82,6 @@ class Test_db_monitoring(unittest.TestCase):
db_monitoring.db_monitoring_wrapper("111", htbtworker.configjsonfile, number_of_iterations=0)
self.assertEqual(status, True)
-if __name__ == "__main__": # pragma: no cover
+
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_htbt_get_cbs_config.py b/tests/test_htbt_get_cbs_config.py
index 99e347c..2a2b641 100644
--- a/tests/test_htbt_get_cbs_config.py
+++ b/tests/test_htbt_get_cbs_config.py
@@ -105,7 +105,7 @@ class test_get_cbs_config(unittest.TestCase):
)
# create copy of snmphtbt.json for pytest
- #pytest_json_config = "/tmp/opt/app/miss_htbt_service/etc/config.json"
+ # pytest_json_config = "/tmp/opt/app/miss_htbt_service/etc/config.json"
pytest_json_config = "test-config.json"
with open(pytest_json_config, "w") as outfile:
outfile.write(pytest_json_data)
@@ -130,19 +130,18 @@ class test_get_cbs_config(unittest.TestCase):
print("result: %s" % result)
self.assertEqual(result, True)
- @patch('misshtbtd.create_update_hb_common')
- @patch('misshtbtd.read_hb_common')
+ @patch("misshtbtd.create_update_hb_common")
+ @patch("misshtbtd.read_hb_common")
def test_poll_cbs(self, mock1, mock2):
"""
TBD
"""
status = True
current_time = round(time.time())
- mock1.return_value = ('1', 'RUNNING', 'AA', current_time)
+ mock1.return_value = ("1", "RUNNING", "AA", current_time)
# configjsonfile = (os.path.dirname(__file__))+"/test-config.json"
configjsonfile = "test-config.json"
os.environ.update(CBS_HTBT_JSON=configjsonfile)
os.environ["pytest"] = "test"
cp.poll_cbs(1)
self.assertEqual(status, True)
-
diff --git a/tests/test_htbt_vnf_table.py b/tests/test_htbt_vnf_table.py
index 4b6c3f1..e425766 100644
--- a/tests/test_htbt_vnf_table.py
+++ b/tests/test_htbt_vnf_table.py
@@ -52,39 +52,39 @@ class test_vnf_tables(unittest.TestCase):
global ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval
ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = hb_properties()
- @patch('htbtworker.postgres_db_open')
- @patch('misshtbtd.db_table_creation_check', return_value=True)
+ @patch("htbtworker.postgres_db_open")
+ @patch("misshtbtd.db_table_creation_check", return_value=True)
def test_validate_vnf_table_1(self, mock, mock1):
result = verify_DB_creation_1(user_name, password, ip_address, port_num, db_name)
self.assertEqual(result, True)
- @patch('htbtworker.postgres_db_open')
- @patch('misshtbtd.db_table_creation_check', return_value=True)
+ @patch("htbtworker.postgres_db_open")
+ @patch("misshtbtd.db_table_creation_check", return_value=True)
def test_validate_vnf_table_2(self, mock, mock1):
result = verify_DB_creation_2(user_name, password, ip_address, port_num, db_name)
self.assertEqual(result, True)
- @patch('htbtworker.postgres_db_open')
- @patch('misshtbtd.db_table_creation_check', return_value=True)
+ @patch("htbtworker.postgres_db_open")
+ @patch("misshtbtd.db_table_creation_check", return_value=True)
def test_validate_hb_common(self, mock, mock1):
result = verify_DB_creation_hb_common()
self.assertEqual(result, True)
-
- @patch('cbs_polling.poll_cbs')
+
+ @patch("cbs_polling.poll_cbs")
def test_cbspolling(self, mock):
# Check if no exception thrown
verify_cbspolling()
- @patch('misshtbtd.fetch_json_file')
+ @patch("misshtbtd.fetch_json_file")
def test_fetch_json_file(self, mock1):
- configjsonfile = (os.path.dirname(__file__))+"/test-config.json"
+ configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
mock1.return_value = configjsonfile
-
+
result = verify_fetch_json_file()
_logger.info(result)
self.assertEqual(result, True)
- @patch('misshtbtd.main')
+ @patch("misshtbtd.main")
def test_misshtbtdmain(self, mock):
result = verify_misshtbtdmain()
_logger.info(result)
@@ -95,14 +95,14 @@ class test_vnf_tables(unittest.TestCase):
_logger.info(result)
self.assertEqual(result, True)
- @patch('misshtbtd.fetch_json_file')
- @patch('misshtbtd.read_hb_common')
- @patch('db_monitoring.db_monitoring')
+ @patch("misshtbtd.fetch_json_file")
+ @patch("misshtbtd.read_hb_common")
+ @patch("db_monitoring.db_monitoring")
def test_dbmonitoring(self, mock1, mock2, mock3):
- configjsonfile = (os.path.dirname(__file__))+"/test-config.json"
+ configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
mock1.return_value = configjsonfile
- mock2.return_value = ("1234","RUNNING", "XYZ", 1234)
-
+ mock2.return_value = ("1234", "RUNNING", "XYZ", 1234)
+
result = verify_dbmonitoring()
_logger.info(result)
self.assertEqual(result, True)
diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py
index ee03ddb..4066a9f 100644
--- a/tests/test_htbtworker.py
+++ b/tests/test_htbtworker.py
@@ -18,148 +18,159 @@
import htbtworker
import os
import tempfile
-import json
+import json
import unittest
from unittest.mock import *
from _pytest.outcomes import skip
-class Test_htbtworker(unittest.TestCase):
+class Test_htbtworker(unittest.TestCase):
def setUp(self):
- htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json"
+ htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
- @patch('requests.get')
- @patch('htbtworker.check_process_reconfiguration', return_value=False)
- @patch('htbtworker.get_eventnamelist')
- @patch('htbtworker.sql_executor')
- def test_process_msg(self, mock1, mock2, mock3, sqlmock1):
+ @patch("requests.get")
+ @patch("htbtworker.check_process_reconfiguration", return_value=False)
+ @patch("htbtworker.get_eventnamelist")
+ @patch("htbtworker.sql_executor")
+ def test_process_msg(self, mock1, mock2, mock3, sqlmock1):
"""
Test to verify event processing using mock
TBD - Negative test
"""
-
+
status = True
- dmaap_data = [{"event":{"commonEventHeader":{"startEpochMicrosec":1544608845841,"sourceId":"VNFB_SRC5","eventId":"mvfs10","nfcNamingCode":"VNFB","timeZoneOffset":"UTC-05:30","reportingEntityId":"cc305d54-75b4-431b-adb2-eb6b9e541234","eventType":"platform","priority":"Normal","version":"4.0.2","reportingEntityName":"ibcx0001vm002oam001","sequence":1000,"domain":"heartbeat","lastEpochMicrosec":1544608845841,"eventName":"Heartbeat_vFW","vesEventListenerVersion":"7.0.2","sourceName":"SOURCE_NAME2","nfNamingCode":"VNFB"},"heartbeatFields":{"heartbeatInterval":20,"heartbeatFieldsVersion":"3.0"}}}]
-
- mock_resp = Mock()
- mock_resp.configure_mock(
- **{
- "text": json.dumps(dmaap_data)
- }
- )
+ dmaap_data = [
+ {
+ "event": {
+ "commonEventHeader": {
+ "startEpochMicrosec": 1544608845841,
+ "sourceId": "VNFB_SRC5",
+ "eventId": "mvfs10",
+ "nfcNamingCode": "VNFB",
+ "timeZoneOffset": "UTC-05:30",
+ "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234",
+ "eventType": "platform",
+ "priority": "Normal",
+ "version": "4.0.2",
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "sequence": 1000,
+ "domain": "heartbeat",
+ "lastEpochMicrosec": 1544608845841,
+ "eventName": "Heartbeat_vFW",
+ "vesEventListenerVersion": "7.0.2",
+ "sourceName": "SOURCE_NAME2",
+ "nfNamingCode": "VNFB",
+ },
+ "heartbeatFields": {"heartbeatInterval": 20, "heartbeatFieldsVersion": "3.0"},
+ }
+ }
+ ]
+
+ mock_resp = Mock()
+ mock_resp.configure_mock(**{"text": json.dumps(dmaap_data)})
mock3.return_value = [("Heartbeat_vDNS", "Heartbeat_vFW")]
- mock1.return_value = mock_resp
+ mock1.return_value = mock_resp
filename = "test-config.json"
htbtworker.process_msg(filename, number_of_iterations=1)
self.assertEqual(status, True)
-
def test_parse_event(self):
"""
test_parse_event() opens the file test1.json and returns attributes
"""
- filename = (os.path.dirname(__file__))+"/test1.json"
- with open(filename,"r") as fp:
- data = fp.read()
- srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data)
+ filename = (os.path.dirname(__file__)) + "/test1.json"
+ with open(filename, "r") as fp:
+ data = fp.read()
+ srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data)
self.assertEqual(srcname, "SOURCE_NAME1")
self.assertEqual(event_name, "Heartbeat_vDNS")
- filename = (os.path.dirname(__file__))+"/test4.json"
- with open(filename,"r") as fp:
- data = fp.read()
- srcname,lastepo,seqnum,event_name = htbtworker.parse_event(data)
+ filename = (os.path.dirname(__file__)) + "/test4.json"
+ with open(filename, "r") as fp:
+ data = fp.read()
+ srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data)
self.assertEqual(srcname, "zalp1bmdns01cmd010")
self.assertEqual(event_name, "Heartbeat_vDNS")
-
- @patch('htbtworker.sql_executor')
- def test_create_and_check_vnf2_table (self, mock_settings):
+ @patch("htbtworker.sql_executor")
+ def test_create_and_check_vnf2_table(self, mock_settings):
"""
Test to verify existence of given table
"""
- mock_cursor = Mock()
- mock_cursor.configure_mock(
- **{
- "fetchone.return_value": [("vnf_table_2")]
- }
- )
+ mock_cursor = Mock()
+ mock_cursor.configure_mock(**{"fetchone.return_value": [("vnf_table_2")]})
mock_settings.return_value = mock_cursor
- status = htbtworker.check_and_create_vnf2_table ()
- self.assertEqual(status, True)
-
- with patch('htbtworker.sql_executor', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
- status = htbtworker.check_and_create_vnf2_table ()
- self.assertEqual(False, status)
-
- @patch('htbtworker.sql_executor')
- def test_new_vnf_entry (self, sql_mock):
+ status = htbtworker.check_and_create_vnf2_table()
+ self.assertEqual(status, True)
+
+ with patch("htbtworker.sql_executor", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
+ status = htbtworker.check_and_create_vnf2_table()
+ self.assertEqual(False, status)
+
+ @patch("htbtworker.sql_executor")
+ def test_new_vnf_entry(self, sql_mock):
"""
Check to verify if new node entry is made for tracking HB
"""
status = True
- htbtworker.new_vnf_entry ("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1)
+ htbtworker.new_vnf_entry("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1)
self.assertEqual(status, True)
- @patch('htbtworker.sql_executor')
- def test_get_eventnamelist (self, sql_mock):
+ @patch("htbtworker.sql_executor")
+ def test_get_eventnamelist(self, sql_mock):
"""
- Test to verify eventname list is returned from vnf_table_1
- TBD - List comparison
+ Test to verify eventname list is returned from vnf_table_1
+ TBD - List comparison
"""
eventname_list = [("Heartbeat_vDNS", "Heartbeat_vFW")]
- mock_cursor = Mock()
- mock_cursor.configure_mock(
- **{
- "fetchall.return_value": eventname_list
- }
- )
+ mock_cursor = Mock()
+ mock_cursor.configure_mock(**{"fetchall.return_value": eventname_list})
sql_mock.return_value = mock_cursor
- return_list = htbtworker.get_eventnamelist ()
- self.assertIn("Heartbeat_vDNS", return_list)
-
- @patch('htbtworker.postgres_db_open')
- def test_sql_executor (self, db_mock):
+ return_list = htbtworker.get_eventnamelist()
+ self.assertIn("Heartbeat_vDNS", return_list)
+
+ @patch("htbtworker.postgres_db_open")
+ def test_sql_executor(self, db_mock):
"""
- Test sql executor wrapper method
+ Test sql executor wrapper method
"""
- htbtworker.sql_executor ("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2")
- htbtworker.sql_executor ("INSERT into information_schema.tables,")
+ htbtworker.sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2")
+ htbtworker.sql_executor("INSERT into information_schema.tables,")
connection_db = db_mock
- with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
+ with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
flag = htbtworker.commit_and_close_db(connection_db)
self.assertEqual(False, flag)
- @patch('psycopg2.connect')
- def test_postgres_db_open (self, mock):
+ @patch("psycopg2.connect")
+ def test_postgres_db_open(self, mock):
"""
Test wrapper for postgres db connection
"""
conn = htbtworker.postgres_db_open()
self.assertIsNotNone(conn)
- @patch('misshtbtd.read_hb_common')
- def test_check_process_reconfiguration (self, mock):
+ @patch("misshtbtd.read_hb_common")
+ def test_check_process_reconfiguration(self, mock):
"""
Test if DB is in reconfiguration state
"""
- mock.return_value = ("1234","RUNNING", "XYZ", 1234)
- flag = htbtworker.check_process_reconfiguration("test", "test","x.x.x.x", "1234", "test_db")
+ mock.return_value = ("1234", "RUNNING", "XYZ", 1234)
+ flag = htbtworker.check_process_reconfiguration("test", "test", "x.x.x.x", "1234", "test_db")
self.assertEqual(False, flag)
-
- @patch('htbtworker.postgres_db_open')
- def test_commit_and_close_db (self, db_mock):
+
+ @patch("htbtworker.postgres_db_open")
+ def test_commit_and_close_db(self, db_mock):
"""
Test commit and close db
"""
connection_db = db_mock
flag = htbtworker.commit_and_close_db(connection_db)
self.assertEqual(True, flag)
- with patch('htbtworker.postgres_db_open.commit', new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
+ with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
flag = htbtworker.commit_and_close_db(connection_db)
self.assertEqual(False, flag)
-
-
-if __name__ == "__main__": # pragma: no cover
+
+
+if __name__ == "__main__": # pragma: no cover
unittest.main()
diff --git a/tests/test_misshtbtd.py b/tests/test_misshtbtd.py
index 0fc8a24..0374a6b 100644
--- a/tests/test_misshtbtd.py
+++ b/tests/test_misshtbtd.py
@@ -20,196 +20,224 @@ import time
import psycopg2
import os
import tempfile
-import json
+import json
import unittest
from unittest.mock import *
from _pytest.outcomes import skip
from pickle import FALSE
-class Test_misshtbtd(unittest.TestCase):
+class Test_misshtbtd(unittest.TestCase):
def setUp(self):
- htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json"
+ htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
- @patch ('psycopg2.connect')
+ @patch("psycopg2.connect")
def test_create_database(self, mock1):
status = True
- mock_cursor = MagicMock()
- mock_cursor.configure_mock(
- **{
- "fetchone.return_value": [("1")]
- }
+ mock_cursor = MagicMock()
+ mock_cursor.configure_mock(**{"fetchone.return_value": [("1")]})
+ mock1.return_value = mock_cursor
+ misshtbtd.create_database(
+ "vnf_table_1", htbtworker.configjsonfile, "10.0.0.0", "1234", "testuser", "testpwd", "heartbeatdb"
)
- mock1.return_value = mock_cursor
- misshtbtd.create_database("vnf_table_1",htbtworker.configjsonfile, "10.0.0.0","1234","testuser","testpwd","heartbeatdb")
self.assertEqual(status, True)
-
- @patch ('htbtworker.postgres_db_open')
+
+ @patch("htbtworker.postgres_db_open")
def test_read_hb_common(self, mock1):
-
- mock_cursor = Mock()
+
+ mock_cursor = Mock()
mock_cursor.configure_mock(
- **{
- "cursor.return_value.fetchall.return_value": [["1", "AA", "123456789", "RUNNING"]]
- }
+ **{"cursor.return_value.fetchall.return_value": [["1", "AA", "123456789", "RUNNING"]]}
)
mock1.return_value = mock_cursor
- self.assertEqual(('1', 'RUNNING', 'AA', '123456789'), misshtbtd.read_hb_common("testuser","testpwd","10.0.0.0","1234","heartbeatdb"))
+ self.assertEqual(
+ ("1", "RUNNING", "AA", "123456789"),
+ misshtbtd.read_hb_common("testuser", "testpwd", "10.0.0.0", "1234", "heartbeatdb"),
+ )
- @patch ('misshtbtd.db_table_creation_check')
- @patch ('htbtworker.postgres_db_open')
+ @patch("misshtbtd.db_table_creation_check")
+ @patch("htbtworker.postgres_db_open")
def test_create_update_hb_common(self, mock1, mock2):
- '''
+ """
TODO: argument ordering TBD
- '''
- mock_cursor = Mock()
- mock1.return_value = mock_cursor
+ """
+ mock_cursor = Mock()
+ mock1.return_value = mock_cursor
mock2.return_value = True
status = True
- misshtbtd.create_update_hb_common (0,111, "RUNNING", "testuser","testpwd","10.0.0.0","1234","testdb")
+ misshtbtd.create_update_hb_common(0, 111, "RUNNING", "testuser", "testpwd", "10.0.0.0", "1234", "testdb")
self.assertEqual(status, True)
mock2.return_value = False
- misshtbtd.create_update_hb_common (1,111, "RUNNING", "testuser","testpwd","10.0.0.0","1234","testdb")
+ misshtbtd.create_update_hb_common(1, 111, "RUNNING", "testuser", "testpwd", "10.0.0.0", "1234", "testdb")
self.assertEqual(status, True)
- def test_db_table_creation_check (self):
+ def test_db_table_creation_check(self):
"""
Test to verify existence of given table
"""
- mock_cursor = Mock()
- mock_cursor.configure_mock(
- **{
- "cursor.return_value.fetchone.return_value": ("vnf_table_2")
- }
- )
- status = misshtbtd.db_table_creation_check (mock_cursor,"vnf_table_2")
- self.assertEqual(status, True)
+ mock_cursor = Mock()
+ mock_cursor.configure_mock(**{"cursor.return_value.fetchone.return_value": ("vnf_table_2")})
+ status = misshtbtd.db_table_creation_check(mock_cursor, "vnf_table_2")
+ self.assertEqual(status, True)
- @patch ('misshtbtd.db_table_creation_check')
- def test_create_update_vnf_table_1 (self, mock1):
+ @patch("misshtbtd.db_table_creation_check")
+ def test_create_update_vnf_table_1(self, mock1):
"""
TBD
"""
status = True
- mock_cursor = Mock()
+ mock_cursor = Mock()
mock_cursor.configure_mock(
- **{
- "cursor.return_value.fetchall.return_value": [["Heartbeat_vDNS"],["Heartbeat_vFW"]]
- }
+ **{"cursor.return_value.fetchall.return_value": [["Heartbeat_vDNS"], ["Heartbeat_vFW"]]}
)
- misshtbtd.create_update_vnf_table_1 (htbtworker.configjsonfile,1, mock_cursor)
- self.assertEqual(status, True)
- misshtbtd.create_update_vnf_table_1 (htbtworker.configjsonfile,0, mock_cursor)
- self.assertEqual(status, True)
+ misshtbtd.create_update_vnf_table_1(htbtworker.configjsonfile, 1, mock_cursor)
+ self.assertEqual(status, True)
+ misshtbtd.create_update_vnf_table_1(htbtworker.configjsonfile, 0, mock_cursor)
+ self.assertEqual(status, True)
- def test_read_hb_properties_default_from_file (self):
+ def test_read_hb_properties_default_from_file(self):
"""
TBD
"""
- return_val = ("10.0.0.0","1234", "postgres-test", "postgres-test", "postgres", "True", "300")
+ return_val = ("10.0.0.0", "1234", "postgres-test", "postgres-test", "postgres", "True", "300")
misshtbtd.hb_properties_file = (os.path.dirname(__file__)) + "/hbproperties-test.yaml"
- (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) = misshtbtd.read_hb_properties_default()
- self.assertEqual (return_val,(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) )
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = misshtbtd.read_hb_properties_default()
+ self.assertEqual(
+ return_val, (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval)
+ )
- @patch.dict(os.environ, {"pg_ipAddress": "10.0.0.10", "pg_portNum":"1234", "pg_userName": "test","pg_passwd":"test"})
- def test_read_hb_properties_default_from_env (self):
+ @patch.dict(
+ os.environ, {"pg_ipAddress": "10.0.0.10", "pg_portNum": "1234", "pg_userName": "test", "pg_passwd": "test"}
+ )
+ def test_read_hb_properties_default_from_env(self):
"""
TBD
"""
- return_val = ("10.0.0.10","1234", "test", "test", "postgres", "True", "300")
+ return_val = ("10.0.0.10", "1234", "test", "test", "postgres", "True", "300")
misshtbtd.hb_properties_file = (os.path.dirname(__file__)) + "/hbproperties-test.yaml"
- (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) = misshtbtd.read_hb_properties_default()
- self.assertEqual (return_val,(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) )
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = misshtbtd.read_hb_properties_default()
+ self.assertEqual(
+ return_val, (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval)
+ )
- def test_read_hb_properties_from_file (self):
+ def test_read_hb_properties_from_file(self):
"""
TBD
"""
- htbtworker.configjsonfile = (os.path.dirname(__file__))+"/test-config.json"
+ htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
misshtbtd.hb_properties_file = (os.path.dirname(__file__)) + "/hbproperties-test.yaml"
-
- return_val = ("10.0.4.1","5432", "postgres", "postgres", "postgres", "True", "300")
- (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) = misshtbtd.read_hb_properties(htbtworker.configjsonfile)
- self.assertEqual (return_val,(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) )
- @patch.dict(os.environ, {"pg_ipAddress": "10.0.0.10", "pg_portNum":"1234", "pg_userName": "test","pg_passwd":"test"})
- def test_read_hb_properties_exception_handling (self):
+ return_val = ("10.0.4.1", "5432", "postgres", "postgres", "postgres", "True", "300")
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = misshtbtd.read_hb_properties(htbtworker.configjsonfile)
+ self.assertEqual(
+ return_val, (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval)
+ )
+
+ @patch.dict(
+ os.environ, {"pg_ipAddress": "10.0.0.10", "pg_portNum": "1234", "pg_userName": "test", "pg_passwd": "test"}
+ )
+ def test_read_hb_properties_exception_handling(self):
"""
TBD
"""
- htbtworker.configjsonfile = (os.path.dirname(__file__))+"/aa-config.json"
+ htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/aa-config.json"
misshtbtd.hb_properties_file = (os.path.dirname(__file__)) + "/hbproperties-test.yaml"
-
- return_val = ("10.0.0.10","1234", "test", "test", "postgres", "True", "300")
- (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) = misshtbtd.read_hb_properties(htbtworker.configjsonfile)
- self.assertEqual (return_val,(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) )
+ return_val = ("10.0.0.10", "1234", "test", "test", "postgres", "True", "300")
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = misshtbtd.read_hb_properties(htbtworker.configjsonfile)
+ self.assertEqual(
+ return_val, (ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval)
+ )
- @patch ('mod.htbt_get_cbs_config')
- def test_fetch_json_file (self, mock1):
+ @patch("mod.htbt_get_cbs_config")
+ def test_fetch_json_file(self, mock1):
"""
TBD
"""
- #mock.return_value.is_file.return_value = True
+ # mock.return_value.is_file.return_value = True
mock1.return_value = True
misshtbtd.CONFIG_PATH = "test-config.json"
filename = misshtbtd.fetch_json_file()
- self.assertEqual (misshtbtd.CONFIG_PATH, filename)
+ self.assertEqual(misshtbtd.CONFIG_PATH, filename)
# mock1.return_value = False
# filename = misshtbtd.fetch_json_file()
# self.assertEqual ("./etc/config.json", filename)
- @patch ('htbtworker.postgres_db_open')
- @patch ('misshtbtd.db_table_creation_check')
- @patch ('misshtbtd.create_update_vnf_table_1')
+ @patch("htbtworker.postgres_db_open")
+ @patch("misshtbtd.db_table_creation_check")
+ @patch("misshtbtd.create_update_vnf_table_1")
def test_create_update_db(self, mock1, mock2, mock3):
- '''
+ """
TODO: argument ordering TBD
- '''
- mock_cursor = Mock()
- mock3.return_value = mock_cursor
+ """
+ mock_cursor = Mock()
+ mock3.return_value = mock_cursor
mock1.return_value = True
mock2.return_value = True
status = True
- misshtbtd.create_update_db (0,htbtworker.configjsonfile, "10.0.0.0", "1234", "test","test", "testdb")
+ misshtbtd.create_update_db(0, htbtworker.configjsonfile, "10.0.0.0", "1234", "test", "test", "testdb")
self.assertEqual(status, True)
mock1.return_value = False
- misshtbtd.create_update_db (0,htbtworker.configjsonfile, "10.0.0.0", "1234", "test","test", "testdb")
+ misshtbtd.create_update_db(0, htbtworker.configjsonfile, "10.0.0.0", "1234", "test", "test", "testdb")
self.assertEqual(status, True)
- @patch ('multiprocessing.Process')
- @patch ('misshtbtd.create_update_db')
- @patch('misshtbtd.create_update_hb_common')
+ @patch("multiprocessing.Process")
+ @patch("misshtbtd.create_update_db")
+ @patch("misshtbtd.create_update_hb_common")
def test_create_process(self, mock1, mock2, mock3):
job_list = []
mock_process = Mock()
- mock_process.configure_mock(
- **{
- "start.return_value": "1"
- }
- )
+ mock_process.configure_mock(**{"start.return_value": "1"})
mock1.return_value = mock_process
- job_list = misshtbtd.create_process ([],htbtworker.configjsonfile, 1)
+ job_list = misshtbtd.create_process([], htbtworker.configjsonfile, 1)
self.assertTrue(job_list)
- @patch ('multiprocessing.Process')
- @patch ('misshtbtd.read_hb_common')
- @patch ('misshtbtd.create_update_db')
- @patch ('misshtbtd.create_update_hb_common')
+ @patch("multiprocessing.Process")
+ @patch("misshtbtd.read_hb_common")
+ @patch("misshtbtd.create_update_db")
+ @patch("misshtbtd.create_update_hb_common")
def test_main(self, mock1, mock2, mock3, mock4):
status = True
mock_process = Mock()
- mock_process.configure_mock(
- **{
- "start.return_value": "1",
- "pid.return_Value":"1111"
- }
- )
+ mock_process.configure_mock(**{"start.return_value": "1", "pid.return_Value": "1111"})
mock1.return_value = mock_process
current_time = round(time.time())
- mock2.return_value = ('1', 'RUNNING', 'AA', current_time)
+ mock2.return_value = ("1", "RUNNING", "AA", current_time)
misshtbtd.main()
self.assertEqual(status, True)
-if __name__ == "__main__": # pragma: no cover
+
+if __name__ == "__main__": # pragma: no cover
unittest.main()