aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Changelog.md3
-rw-r--r--miss_htbt_service/db_monitoring.py237
-rw-r--r--pom.xml2
-rw-r--r--setup.py2
-rw-r--r--tests/test_db_monitoring.py190
-rw-r--r--version.properties2
6 files changed, 309 insertions, 127 deletions
diff --git a/Changelog.md b/Changelog.md
index 014d076..e0f6bdb 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## [2.6.1] - 2023/02/15
+- [DCAEGEN2-2953] Handle no-pragma cases
+
## [2.6.0] - 2022/11/17
- [DCAEGEN2-2953] Code refactoring
- [DCAEGEN2-3321] Fix black reported formatting issues
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
index 0d27a95..472c356 100644
--- a/miss_htbt_service/db_monitoring.py
+++ b/miss_htbt_service/db_monitoring.py
@@ -156,9 +156,10 @@ def sendControlLoopEvent(
return True
-def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name):
+
+def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_num, db_name, sleeptime=20):
while True:
- time.sleep(20)
+ time.sleep(sleeptime)
try:
with open(json_file, "r") as outfile:
@@ -168,125 +169,129 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
msg = "Json file process error : ", err
_logger.error(msg)
continue
+ db_monitoring_singlepass (current_pid, pol_url, user_name, password, ip_address, port_num, db_name)
+ break
+
+def db_monitoring_singlepass (current_pid, pol_url, user_name, password, ip_address, port_num, db_name) :
+ hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
+ source_name = socket.gethostname()
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
+
+ connection_db = pm.postgres_db_open()
+ cur = connection_db.cursor()
+ if (
+ int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING"
+ ):
+ _logger.info("DBM: Active DB Monitoring Instance")
+ cur.execute("SELECT event_name FROM vnf_table_1")
+ vnf_list = [item[0] for item in cur.fetchall()]
+ for event_name in vnf_list:
+ cur.execute("SELECT current_state FROM hb_common")
+ rows = cur.fetchall()
+ hbc_state = rows[0][0]
+ if hbc_state == "RECONFIGURATION":
+ _logger.info("DBM:Waiting for hb_common state to become RUNNING")
+ break
- hbc_pid, hbc_state, hbc_src_name, hbc_time = db.read_hb_common(
- user_name, password, ip_address, port_num, db_name
- )
- source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
- connection_db = pm.postgres_db_open()
- cur = connection_db.cursor()
- if (
- int(current_pid) == int(hbc_pid) and source_name == hbc_src_name and hbc_state == "RUNNING"
- ): # pragma: no cover
- _logger.info("DBM: Active DB Monitoring Instance")
- cur.execute("SELECT event_name FROM vnf_table_1")
- vnf_list = [item[0] for item in cur.fetchall()]
- for event_name in vnf_list:
- cur.execute("SELECT current_state FROM hb_common")
- rows = cur.fetchall()
- hbc_state = rows[0][0]
- if hbc_state == "RECONFIGURATION":
- _logger.info("DBM:Waiting for hb_common state to become RUNNING")
- break
-
- cur.execute(
- "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, "
- "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, "
- "target, version FROM vnf_table_1 WHERE event_name = %s",
- (event_name,),
- )
- rows = cur.fetchall()
- validity_flag = rows[0][0]
- source_name_count = rows[0][1]
- heartbeat_interval = rows[0][2]
- heartbeat_missed_count = rows[0][3]
- closed_control_loop_name = rows[0][4]
- policy_version = rows[0][5]
- policy_name = rows[0][6]
- policy_scope = rows[0][7]
- target_type = rows[0][8]
- target = rows[0][9]
- version = rows[0][10]
- comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000
- if validity_flag == 1:
- for source_name_key in range(source_name_count):
- epoc_time = int(round(time.time() * 1000))
+ cur.execute(
+ "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, "
+ "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, "
+ "target, version FROM vnf_table_1 WHERE event_name = %s",
+ (event_name,),
+ )
+ rows = cur.fetchall()
+ validity_flag = rows[0][0]
+ source_name_count = rows[0][1]
+ heartbeat_interval = rows[0][2]
+ heartbeat_missed_count = rows[0][3]
+ closed_control_loop_name = rows[0][4]
+ policy_version = rows[0][5]
+ policy_name = rows[0][6]
+ policy_scope = rows[0][7]
+ target_type = rows[0][8]
+ target = rows[0][9]
+ version = rows[0][10]
+ comparision_time = (heartbeat_interval * heartbeat_missed_count) * 1000
+ if validity_flag == 1:
+ for source_name_key in range(source_name_count):
+ epoc_time = int(round(time.time() * 1000))
+ cur.execute(
+ "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE "
+ "event_name = %s AND source_name_key = %s",
+ (event_name, (source_name_key + 1)),
+ )
+ row = cur.fetchall()
+ if len(row) == 0:
+ continue
+ epoc_time_sec = row[0][0]
+ src_name = row[0][1]
+ cl_flag = row[0][2]
+ if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0:
+ sendControlLoopEvent(
+ "ONSET",
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ src_name,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+ )
+ cl_flag = 1
cur.execute(
- "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE "
- "event_name = %s AND source_name_key = %s",
- (event_name, (source_name_key + 1)),
+ "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
+ (cl_flag, event_name, (source_name_key + 1)),
)
- row = cur.fetchall()
- if len(row) == 0:
- continue
- epoc_time_sec = row[0][0]
- src_name = row[0][1]
- cl_flag = row[0][2]
- if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0: # pragma: no cover
- sendControlLoopEvent(
- "ONSET",
- pol_url,
- policy_version,
- policy_name,
- policy_scope,
- target_type,
- src_name,
- epoc_time,
- closed_control_loop_name,
- version,
- target,
- )
- cl_flag = 1
- cur.execute(
- "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
- (cl_flag, event_name, (source_name_key + 1)),
- )
- connection_db.commit()
- elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1: # pragma: no cover
- sendControlLoopEvent(
- "ABATED",
- pol_url,
- policy_version,
- policy_name,
- policy_scope,
- target_type,
- src_name,
- epoc_time,
- closed_control_loop_name,
- version,
- target,
- )
- cl_flag = 0
- cur.execute(
- "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
- (cl_flag, event_name, (source_name_key + 1)),
- )
- connection_db.commit()
+ connection_db.commit()
+ elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1:
+ sendControlLoopEvent(
+ "ABATED",
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ src_name,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+ )
+ cl_flag = 0
+ cur.execute(
+ "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
+ (cl_flag, event_name, (source_name_key + 1)),
+ )
+ connection_db.commit()
- else: # pragma: no cover
- msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % event_name
- _logger.info(msg)
+ else:
+ msg = "DBM:DB Monitoring is ignored for %s since validity flag is 0" % event_name
+ _logger.info(msg)
+
+ cur.execute("DELETE FROM vnf_table_2 WHERE EVENT_NAME = %s", (event_name,))
+ cur.execute("DELETE FROM vnf_table_1 WHERE EVENT_NAME = %s", (event_name,))
+ connection_db.commit()
+ """
+ Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2
+ """
+ else:
+ msg = "DBM:Inactive instance or hb_common state is not RUNNING"
+ _logger.info(msg)
+ try:
+ connection_db.commit() # <--- makes sure the change is shown in the database
+ connection_db.close()
+ return True
+ except psycopg2.DatabaseError as e:
+ msg = "COMMON:Error %s" % e
+ _logger.error(msg)
+ return False
+ cur.close()
- cur.execute("DELETE FROM vnf_table_2 WHERE EVENT_NAME = %s", (event_name,))
- cur.execute("DELETE FROM vnf_table_1 WHERE EVENT_NAME = %s", (event_name,))
- connection_db.commit()
- """
- Delete the VNF entry in table1 and delete all the source ids related to vnfs in table2
- """
- else: # pragma: no cover
- msg = "DBM:Inactive instance or hb_common state is not RUNNING"
- _logger.info(msg)
- try:
- connection_db.commit() # <--- makes sure the change is shown in the database
- connection_db.close()
- return True
- except psycopg2.DatabaseError as e:
- msg = "COMMON:Error %s" % e
- _logger.error(msg)
- return False
- cur.close()
- break
def db_monitoring_wrapper(current_pid, jsfile, number_of_iterations=-1):
diff --git a/pom.xml b/pom.xml
index 116aa4f..af9341d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@ limitations under the License.
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>heartbeat</artifactId>
<name>dcaegen2-services-heartbeat</name>
- <version>2.6.0</version>
+ <version>2.6.1</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<sonar.sources>.</sonar.sources>
diff --git a/setup.py b/setup.py
index ed8c5ed..7b258f3 100644
--- a/setup.py
+++ b/setup.py
@@ -20,7 +20,7 @@ from setuptools import setup, find_packages
setup(
name="miss_htbt_service",
description="Missing heartbeat microservice to communicate with policy-engine",
- version="2.6.0",
+ version="2.6.1",
# packages=find_packages(exclude=["tests.*", "tests"]),
packages=find_packages(),
install_requires=[
diff --git a/tests/test_db_monitoring.py b/tests/test_db_monitoring.py
index 5315a66..9d5c3c3 100644
--- a/tests/test_db_monitoring.py
+++ b/tests/test_db_monitoring.py
@@ -23,14 +23,134 @@ import tempfile
import os
import json
import unittest
+import time
+import threading
from unittest.mock import *
from _pytest.outcomes import skip
+from pickle import NONE
_logger = logging.getLogger(__name__)
class Test_db_monitoring(unittest.TestCase):
+
+ class PseudoCursorCase1():
+ # Test setup for RECONFIGURATION state
+ fetchall_1 = ""
+ @classmethod
+ def execute(ctype ,command,arg=None):
+ global fetchall_1
+ if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"):
+ fetchall_1 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]]
+ elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "):
+ millisec = time.time() * 1000
+ fetchall_1 = [[millisec -300 ,"testnodeA",0]]
+ elif command.startswith ("SELECT event_name FROM vnf_table_1"):
+ fetchall_1 = [["Heartbeat_vDNS","Heartbeat_vFw"]]
+ elif command.startswith ("SELECT current_state"):
+ fetchall_1 = [["RECONFIGURATION"]]
+ elif command.startswith ("DELETE "):
+ fetchall_1 = None
+ elif command.startswith ("UPDATE"):
+ fetchall_1 = None
+ else:
+ raise RuntimeError("Unknown db execution")
+ @classmethod
+ def fetchall (ctype):
+ global fetchall_1
+ return fetchall_1
+ @classmethod
+ def close(ctype):
+ pass
+
+ class PseudoCursorCase2():
+ # Test setup for RUNNING state and CL Onset condition
+ fetchall_2 = ""
+ @classmethod
+ def execute(ctype ,command,arg=None):
+ global fetchall_2
+ if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"):
+ fetchall_2 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]]
+ elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "):
+ millisec = time.time() * 1000
+ fetchall_2 = [[millisec -500 ,"testnodeA",0]]
+ elif command.startswith ("SELECT event_name FROM vnf_table_1"):
+ fetchall_2 = [["Heartbeat_vDNS","Heartbeat_vFw"]]
+ elif command.startswith ("SELECT current_state"):
+ fetchall_2 = [["RUNNING"]]
+ elif command.startswith ("DELETE "):
+ fetchall_2 = None
+ elif command.startswith ("UPDATE"):
+ fetchall_2 = None
+ else:
+ raise RuntimeError("Unknown db execution")
+ @classmethod
+ def fetchall (ctype):
+ global fetchall_2
+ return fetchall_2
+ @classmethod
+ def close(ctype):
+ pass
+
+ class PseudoCursorCase3():
+ # Test setup for RUNNING state and CL Abatement condition
+ fetchall_3 = ""
+ @classmethod
+ def execute(ctype ,command,arg=None):
+ global fetchall_3
+ if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"):
+ fetchall_3 = [[1,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]]
+ elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "):
+ millisec = time.time() * 1000
+ fetchall_3 = [[millisec -20 ,"testnodeA",1]]
+ elif command.startswith ("SELECT event_name FROM vnf_table_1"):
+ fetchall_3 = [["Heartbeat_vDNS","Heartbeat_vFw"]]
+ elif command.startswith ("SELECT current_state"):
+ fetchall_3 = [["RUNNING"]]
+ elif command.startswith ("DELETE "):
+ fetchall_3 = None
+ elif command.startswith ("UPDATE"):
+ fetchall_3 = None
+ else:
+ raise RuntimeError("Unknown db execution")
+ @classmethod
+ def fetchall (ctype):
+ global fetchall_3
+ return fetchall_3
+ @classmethod
+ def close(ctype):
+ pass
+
+ class PseudoCursorCase4():
+ # Test setup for SourceNode not actively tracked for CL (validity_flag=0)
+ fetchall_4 = ""
+ @classmethod
+ def execute(ctype ,command,arg=None):
+ global fetchall_4
+ if command.startswith("SELECT validity_flag, source_name_count, heartbeat_interval,"):
+ fetchall_4 = [[0,1,300,1,"TEMP-CL", "1.0","TEMPPOLICY","TEMP","VM","TEMP","1.0"]]
+ elif command.startswith ("SELECT last_epo_time, source_name, cl_flag FROM "):
+ millisec = time.time() * 1000
+ fetchall_4 = [[millisec -500 ,"testnodeA",0]]
+ elif command.startswith ("SELECT event_name FROM vnf_table_1"):
+ fetchall_4 = [["Heartbeat_vDNS","Heartbeat_vFw"]]
+ elif command.startswith ("SELECT current_state"):
+ fetchall_4 = [["RUNNING"]]
+ elif command.startswith ("DELETE "):
+ fetchall_4 = None
+ elif command.startswith ("UPDATE"):
+ fetchall_4 = None
+ else:
+ raise RuntimeError("Unknown db execution")
+ @classmethod
+ def fetchall (ctype):
+ global fetchall_4
+ return fetchall_4
+ @classmethod
+ def close(ctype):
+ pass
+
def setUp(self):
htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
@@ -57,30 +177,84 @@ class Test_db_monitoring(unittest.TestCase):
)
self.assertEqual(status, True)
- @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ", 1234))
+ @patch("misshtbtd.read_hb_common", return_value=("1234", "RUNNING", "XYZ-", 1234))
@patch("htbtworker.postgres_db_open")
- def test_db_monitoring(self, mock1, mock2):
+ @patch("socket.gethostname", return_value = "XYZ")
+ #@patch("db_monitoring.sendControlLoopEvent", return_value = True)
+ def test_db_monitoring(self, mock1, mock2, mock3):
status = True
+
mock_cursor = Mock()
- mock2.cursor.return_value = mock_cursor
+
+ ## Test setup for RECONFIGURATION state
+ mock_cursor.configure_mock(
+ **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase1}
+ )
+ mock2.return_value = mock_cursor
+ # Test for outer else when PID doesn't match
db_monitoring.db_monitoring(
- "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ "111", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1
)
self.assertEqual(status, True)
+ # Test for RECONFIGURATION state
+ mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ-", 1234)
db_monitoring.db_monitoring(
- "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1
)
self.assertEqual(status, True)
- mock1.cursor.return_value = ("1234", "RECONFIGURATION", "XYZ", 1234)
+
+ ## Test for RUNNING state and CL ONSET
+ mock_cursor.configure_mock(
+ **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase2}
+ )
+ mock2.return_value = mock_cursor
+ db_monitoring.db_monitoring(
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1
+ )
+ self.assertEqual(status, True)
+
+ ## Test for RUNNING state and CL Abatement condition
+ mock_cursor.configure_mock(
+ **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase3}
+ )
+ mock2.return_value = mock_cursor
db_monitoring.db_monitoring(
- "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name"
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name" , 1
)
self.assertEqual(status, True)
+ ## # Test setup for SourceNode not actively tracked for CL (validity_flag=0)
+ mock_cursor.configure_mock(
+ **{"cursor.return_value":Test_db_monitoring.PseudoCursorCase4}
+ )
+ mock2.return_value = mock_cursor
+ db_monitoring.db_monitoring(
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 1
+ )
+ self.assertEqual(status, True)
+
+ ### Test for exception with missing json file path
+ htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"
+ os.rename(htbtworker.configjsonfile, htbtworker.configjsonfile + "_1")
+ t1 = threading.Thread(target=Test_db_monitoring.switch_filepath, args=(htbtworker.configjsonfile,))
+ t1.start()
+ db_monitoring.db_monitoring(
+ "1234", htbtworker.configjsonfile, "testuser", "testpwd", "10.0.0.0", "1234", "db_name", 3
+ )
+ t1.join()
+ self.assertEqual(status, True)
+
+ @classmethod
+ def switch_filepath(ctype, jsonfile):
+ #print (f"ctype: {ctype} type : {type(ctype)}")
+ time.sleep(5)
+ os.rename(htbtworker.configjsonfile+"_1", htbtworker.configjsonfile)
+
+
def test_db_monitoring_wrapper(self):
status = True
db_monitoring.db_monitoring_wrapper("111", htbtworker.configjsonfile, number_of_iterations=0)
- self.assertEqual(status, True)
+ self.assertEqual(status, True)
if __name__ == "__main__": # pragma: no cover
diff --git a/version.properties b/version.properties
index 8201005..62c1a29 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
major=2
minor=6
-patch=0
+patch=1
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT