diff options
-rw-r--r-- | Changelog.md | 3 | ||||
-rw-r--r-- | miss_htbt_service/db_monitoring.py | 237 | ||||
-rw-r--r-- | pom.xml | 2 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tests/test_db_monitoring.py | 190 | ||||
-rw-r--r-- | version.properties | 2 |
6 files changed, 309 insertions, 127 deletions
diff --git a/Changelog.md b/Changelog.md index 014d076..e0f6bdb 100644 --- a/Changelog.md +++ b/Changelog.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [2.6.1] - 2023/02/15 +- [DCAEGEN2-2953] Handle no-pragma cases + ## [2.6.0] - 2022/11/17 - [DCAEGEN2-2953] Code refactoring - [DCAEGEN2-3321] Fix black reported formatting issues diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py index 0d27a95..472c356 100644 --- a/miss_htbt_service/db_monitoring.py +++ b/miss_htbt_service/db_monitoring.py @@ -156,9 +156,10 @@ def sendControlLoopEvent( return True -def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name): + +def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name, sleeptime=20): while True: - time.sleep(20) + time.sleep(sleeptime) try: with open(json_file, "r") as outfile: @@ -168,125 +169,129 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_ msg = "Json file process error : ", err _logger.error(msg) continue + db_monitoring_singlepass (current_pid, pol_url, user_name, password, ip_address, port_num, db_name) + break + +def db_monitoring_singlepass (current_pid, pol_url, user_name, password, ip_address, port_num, db_name) : + hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) + source_name = socket.gethostname() + source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) + + connection_db = pm.postgres_db_open() + cur = connection_db.cursor() + if ( + int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING" + ): + _logger.info("DBM: Active DB Monitoring Instance") + cur.execute("SELECT event_name FROM vnf_table_1") + vnf_list = [item[0] for item in cur.fetchall()] + for event_name in vnf_list: + cur.execute("SELECT current_state FROM hb_common") + rows = cur.fetchall() + hbc_state = rows[0][0] + if hbc_state == "RECONFIGURATION": + _logger.info("DBM:Waiting for hb_common state to become RUNNING") + break - hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common( - user_name, password, ip_address, port_num, db_name - ) - source_name = socket.gethostname() - source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", "")) - connection_db = pm.postgres_db_open() - cur = connection_db.cursor() - if ( - int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING" - ): # pragma: no cover - _logger.info("DBM: Active DB Monitoring Instance") - cur.execute("SELECT event_name FROM vnf_table_1") - vnf_list = [item[0] for item in cur.fetchall()] - for event_name in vnf_list: - cur.execute("SELECT current_state FROM hb_common") - rows = cur.fetchall() - hbc_state = rows[0][0] - if hbc_state == "RECONFIGURATION": - _logger.info("DBM:Waiting for hb_common state to become RUNNING") - break - - cur.execute( - "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " - "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " - "target, version FROM vnf_table_1 WHERE event_name = %s", - (event_name,), - ) - rows = cur.fetchall() - validity_flag = rows[0][0] - source_name_count = rows[0][1] - heartbeat_interval = rows[0][2] - heartbeat_missed_count = rows[0][3] - closed_control_loop_name = rows[0][4] - policy_version = rows[0][5] - policy_name = rows[0][6] - policy_scope = rows[0][7] - target_type = rows[0][8] - target = rows[0][9] - version = rows[0][10] - comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000 - if validity_flag == 1: - for source_name_key in range(source_name_count): - epoc_time = int(round(time.time() * 1000)) + cur.execute( + "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, " + "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, " + "target, version FROM vnf_table_1 WHERE event_name = %s", + (event_name,), + ) + rows = cur.fetchall() + validity_flag = rows[0][0] + source_name_count = rows[0][1] + heartbeat_interval = rows[0][2] + heartbeat_missed_count = rows[0][3] + closed_control_loop_name = rows[0][4] + policy_version = rows[0][5] + policy_name = rows[0][6] + policy_scope = rows[0][7] + target_type = rows[0][8] + target = rows[0][9] + version = rows[0][10] + comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000 + if validity_flag == 1: + for source_name_key in range(source_name_count): + epoc_time = int(round(time.time() * 1000)) + cur.execute( + "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " + "event_name = %s AND source_name_key = %s", + (event_name, (source_name_key + 1)), + ) + row = cur.fetchall() + if len(row) == 0: + continue + epoc_time_sec = row[0][0] + src_name = row[0][1] + cl_flag = row[0][2] + if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: + sendControlLoopEvent( + "ONSET", + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + src_name, + epoc_time, + closed_control_loop_name, + version, + target, + ) + cl_flag = 1 cur.execute( - "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE " - "event_name = %s AND source_name_key = %s", - (event_name, (source_name_key + 1)), + "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", + (cl_flag, event_name, (source_name_key + 1)), ) - row = cur.fetchall() - if len(row) == 0: - continue - epoc_time_sec = row[0][0] - src_name = row[0][1] - cl_flag = row[0][2] - if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover - sendControlLoopEvent( - "ONSET", - pol_url, - policy_version, - policy_name, - policy_scope, - target_type, - src_name, - epoc_time, - closed_control_loop_name, - version, - target, - ) - cl_flag = 1 - cur.execute( - "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", - (cl_flag, event_name, (source_name_key + 1)), - ) - connection_db.commit() - elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover - sendControlLoopEvent( - "ABATED", - pol_url, - policy_version, - policy_name, - policy_scope, - target_type, - src_name, - epoc_time, - closed_control_loop_name, - version, - target, - ) - cl_flag = 0 - cur.execute( - "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", - (cl_flag, event_name, (source_name_key + 1)), - ) - connection_db.commit() + connection_db.commit() + elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: + sendControlLoopEvent( + "ABATED", + pol_url, + policy_version, + policy_name, + policy_scope, + target_type, + src_name, + epoc_time, + closed_control_loop_name, + version, + target, + ) + cl_flag = 0 + cur.execute( + "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s", + (cl_flag, event_name, (source_name_key + 1)), + ) + connection_db.commit() - else: # pragma: no cover - msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % event_name - _logger.info(msg) + else: + msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % event_name + _logger.info(msg) + + cur.execute("DELETE FROM vnf_table_2 WHERE EVENT_NAME = %s", (event_name,)) + cur.execute("DELETE FROM vnf_table_1 WHERE EVENT_NAME = %s", (event_name,)) + connection_db.commit() + """ + Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2 + """ + else: + msg = "DBM:Inactive instance or hb_common state is not RUNNING" + _logger.info(msg) + try: + connection_db.commit() # <--- makes sure the change is shown in the database + connection_db.close() + return True + except psycopg2.DatabaseError as e: + msg = "COMMON:Error %s" % e + _logger.error(msg) + return False + cur.close() - cur.execute("DELETE FROM vnf_table_2 WHERE EVENT_NAME = %s", (event_name,)) - cur.execute("DELETE FROM vnf_table_1 WHERE EVENT_NAME = %s", (event_name,)) - connection_db.commit() - """ - Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2 - """ - else: # pragma: no cover - msg = "DBM:Inactive instance or hb_common state is not RUNNING" - _logger.info(msg) - try: - connection_db.commit() # <--- makes sure the change is shown in the database - connection_db.close() - return True - except psycopg2.DatabaseError as e: - msg = "COMMON:Error %s" % e - _logger.error(msg) - return False - cur.close() - break def db_monitoring_wrapper(current_pid, jsfile, number_of_iterations=-1): @@ -37,7 +37,7 @@ limitations under the License. <groupId>org.onap.dcaegen2.services</groupId> <artifactId>heartbeat</artifactId> <name>dcaegen2-services-heartbeat</name> - <version>2.6.0</version> + <version>2.6.1</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <sonar.sources>.</sonar.sources> @@ -20,7 +20,7 @@ from setuptools import setup, find_packages setup( name="miss_htbt_service", description="Missing heartbeat microservice to communicate with policy-engine", - version="2.6.0", + version="2.6.1", # packages=find_packages(exclude=["tests.*", "tests"]), packages=find_packages(), install_requires=[ diff --git a/tests/test_db_monitoring.py b/tests/test_db_monitoring.py index 5315a66..9d5c3c3 100644 --- a/tests/test_db_monitoring.py +++ b/tests/test_db_monitoring.py @@ -23,14 +23,134 @@ import tempfile import os import json import unittest +import time +import threading from unittest.mock import * from _pytest.outcomes import skip +from pickle import NONE _logger = logging.getLogger(__name__) class Test_db_monitoring(unittest.TestCase): + + class PseudoCursorCase1(): + # Test setup for RECONFIGURATION state + fetchall_1 = "" + @classmethod + def execute(ctype ,command,arg=None): + global fetchall_1 + if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"): + fetchall_1 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]] + elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "): + millisec = time.time() * 1000 + fetchall_1 = [[millisec -300 ,"testnodeA",0]] + elif command.startswith ("SELECT event_name FROM vnf_table_1"): + fetchall_1 = [["Heartbeat_vDNS","Heartbeat_vFw"]] + elif command.startswith ("SELECT current_state"): + fetchall_1 = [["RECONFIGURATION"]] + elif command.startswith ("DELETE "): + fetchall_1 = None + elif command.startswith ("UPDATE"): + fetchall_1 = None + else: + raise RuntimeError("Unknown db execution") + @classmethod + def fetchall (ctype): + global fetchall_1 + return fetchall_1 + @classmethod + def close(ctype): + pass + + class PseudoCursorCase2(): + # Test setup for RUNNING state and CL Onset condition + fetchall_2 = "" + @classmethod + def execute(ctype ,command,arg=None): + global fetchall_2 + if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"): + fetchall_2 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]] + elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "): + millisec = time.time() * 1000 + fetchall_2 = [[millisec -500 ,"testnodeA",0]] + elif command.startswith ("SELECT event_name FROM vnf_table_1"): + fetchall_2 = [["Heartbeat_vDNS","Heartbeat_vFw"]] + elif command.startswith ("SELECT current_state"): + fetchall_2 = [["RUNNING"]] + elif command.startswith ("DELETE "): + fetchall_2 = None + elif command.startswith ("UPDATE"): + fetchall_2 = None + else: + raise RuntimeError("Unknown db execution") + @classmethod + def fetchall (ctype): + global fetchall_2 + return fetchall_2 + @classmethod + def close(ctype): + pass + + class PseudoCursorCase3(): + # Test setup for RUNNING state and CL Abatement condition + fetchall_3 = "" + @classmethod + def execute(ctype ,command,arg=None): + global fetchall_3 + if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"): + fetchall_3 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]] + elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "): + millisec = time.time() * 1000 + fetchall_3 = [[millisec -20 ,"testnodeA",1]] + elif command.startswith ("SELECT event_name FROM vnf_table_1"): + fetchall_3 = [["Heartbeat_vDNS","Heartbeat_vFw"]] + elif command.startswith ("SELECT current_state"): + fetchall_3 = [["RUNNING"]] + elif command.startswith ("DELETE "): + fetchall_3 = None + elif command.startswith ("UPDATE"): + fetchall_3 = None + else: + raise RuntimeError("Unknown db execution") + @classmethod + def fetchall (ctype): + global fetchall_3 + return fetchall_3 + @classmethod + def close(ctype): + pass + + class PseudoCursorCase4(): + # Test setup for SourceNode not actively tracked for CL (validity_flag=0) + fetchall_4 = "" + @classmethod + def execute(ctype ,command,arg=None): + global fetchall_4 + if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"): + fetchall_4 = [[0,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]] + elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "): + millisec = time.time() * 1000 + fetchall_4 = [[millisec -500 ,"testnodeA",0]] + elif command.startswith ("SELECT event_name FROM vnf_table_1"): + fetchall_4 = [["Heartbeat_vDNS","Heartbeat_vFw"]] + elif command.startswith ("SELECT current_state"): + fetchall_4 = [["RUNNING"]] + elif command.startswith ("DELETE "): + fetchall_4 = None + elif command.startswith ("UPDATE"): + fetchall_4 = None + else: + raise RuntimeError("Unknown db execution") + @classmethod + def fetchall (ctype): + global fetchall_4 + return fetchall_4 + @classmethod + def close(ctype): + pass + def setUp(self): htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" @@ -57,30 +177,84 @@ class Test_db_monitoring(unittest.TestCase): ) self.assertEqual(status, True) - @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ", 1234)) + @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ-", 1234)) @patch("htbtworker.postgres_db_open") - def test_db_monitoring(self, mock1, mock2): + @patch("socket.gethostname", return_value = "XYZ") + #@patch("db_monitoring.sendControlLoopEvent", return_value = True) + def test_db_monitoring(self, mock1, mock2, mock3): status = True + mock_cursor = Mock() - mock2.cursor.return_value = mock_cursor + + ## Test setup for RECONFIGURATION state + mock_cursor.configure_mock( + **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase1} + ) + mock2.return_value = mock_cursor + # Test for outer else when PID doesn't match db_monitoring.db_monitoring( - "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1 ) self.assertEqual(status, True) + # Test for RECONFIGURATION state + mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ-", 1234) db_monitoring.db_monitoring( - "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1 ) self.assertEqual(status, True) - mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ", 1234) + + ## Test for RUNNING state and CL ONSET + mock_cursor.configure_mock( + **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase2} + ) + mock2.return_value = mock_cursor + db_monitoring.db_monitoring( + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1 + ) + self.assertEqual(status, True) + + ## Test for RUNNING state and CL Abatement condition + mock_cursor.configure_mock( + **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase3} + ) + mock2.return_value = mock_cursor db_monitoring.db_monitoring( - "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" , 1 ) self.assertEqual(status, True) + ## # Test setup for SourceNode not actively tracked for CL (validity_flag=0) + mock_cursor.configure_mock( + **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase4} + ) + mock2.return_value = mock_cursor + db_monitoring.db_monitoring( + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1 + ) + self.assertEqual(status, True) + + ### Test for exception with missing json file path + htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json" + os.rename(htbtworker.configjsonfile, htbtworker.configjsonfile + "_1") + t1 = threading.Thread(target=Test_db_monitoring.switch_filepath, args=(htbtworker.configjsonfile,)) + t1.start() + db_monitoring.db_monitoring( + "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 3 + ) + t1.join() + self.assertEqual(status, True) + + @classmethod + def switch_filepath(ctype, jsonfile): + #print (f"ctype: {ctype} type : {type(ctype)}") + time.sleep(5) + os.rename(htbtworker.configjsonfile+"_1", htbtworker.configjsonfile) + + def test_db_monitoring_wrapper(self): status = True db_monitoring.db_monitoring_wrapper("111", htbtworker.configjsonfile, number_of_iterations=0) - self.assertEqual(status, True) + self.assertEqual(status, True) if __name__ == "__main__": # pragma: no cover diff --git a/version.properties b/version.properties index 8201005..62c1a29 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=2 minor=6 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT |