aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/db_monitoring.py
diff options
context:
space:
mode:
Diffstat (limited to 'miss_htbt_service/db_monitoring.py')
-rw-r--r--miss_htbt_service/db_monitoring.py243
1 files changed, 151 insertions, 92 deletions
diff --git a/miss_htbt_service/db_monitoring.py b/miss_htbt_service/db_monitoring.py
index a405876..32a2d5c 100644
--- a/miss_htbt_service/db_monitoring.py
+++ b/miss_htbt_service/db_monitoring.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -37,81 +37,100 @@ import get_logger
_logger = logging.getLogger(__name__)
-def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_scope, target_type, srcName, epoc_time,
- closed_control_loop_name, version, target):
+def sendControlLoopEvent(
+ CLType,
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+):
msg = "DBM:Time to raise Control Loop Event for Control loop typ /target type - ", CLType, target_type
_logger.info(msg)
if CLType == "ONSET":
_logger.info("DBM:Heartbeat not received, raising alarm event")
if target_type == "VNF":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"generic-vnf.vnf-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"generic-vnf.vnf-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
elif target_type == "VM":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"vserver.vserver-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ONSET",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"vserver.vserver-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ONSET",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
else:
return True
elif CLType == "ABATED":
_logger.info("DBM:Heartbeat received, clearing alarm event")
# last_date_time = datetime.datetime.now()
if target_type == "VNF":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"generic-vnf.vnf-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ABATED",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"generic-vnf.vnf-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ABATED",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
elif target_type == "VM":
- json_object = json.dumps({
- "closedLoopEventClient": "DCAE_Heartbeat_MS",
- "policyVersion": policy_version,
- "policyName": policy_name,
- "policyScope": policy_scope,
- "target_type": target_type,
- "AAI": {"vserver.vserver-name": srcName},
- "closedLoopAlarmStart": epoc_time,
- "closedLoopEventStatus": "ABATED",
- "closedLoopControlName": closed_control_loop_name,
- "version": version,
- "target": target,
- "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
- "from": "DCAE"
- })
+ json_object = json.dumps(
+ {
+ "closedLoopEventClient": "DCAE_Heartbeat_MS",
+ "policyVersion": policy_version,
+ "policyName": policy_name,
+ "policyScope": policy_scope,
+ "target_type": target_type,
+ "AAI": {"vserver.vserver-name": srcName},
+ "closedLoopAlarmStart": epoc_time,
+ "closedLoopEventStatus": "ABATED",
+ "closedLoopControlName": closed_control_loop_name,
+ "version": version,
+ "target": target,
+ "requestID": "8c1b8bd8-06f7-493f-8ed7-daaa4cc481bc",
+ "from": "DCAE",
+ }
+ )
else:
return True
else:
@@ -131,7 +150,7 @@ def sendControlLoopEvent(CLType, pol_url, policy_version, policy_name, policy_sc
msg = "DBM:Status code for sending the control loop event is", ret
_logger.info(msg)
except Exception as err:
- msg = 'Message send failure : ', err
+ msg = "Message send failure : ", err
_logger.error(msg)
return True
@@ -140,22 +159,24 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
while True:
time.sleep(20)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
break
try:
- with open(json_file, 'r') as outfile:
+ with open(json_file, "r") as outfile:
cfg = json.load(outfile)
- pol_url = str(cfg['streams_publishes']['dcae_cl_out']['dmaap_info']['topic_url'])
+ pol_url = str(cfg["streams_publishes"]["dcae_cl_out"]["dmaap_info"]["topic_url"])
except Exception as err:
- msg = 'Json file process error : ', err
+ msg = "Json file process error : ", err
_logger.error(msg)
continue
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
source_name = socket.gethostname()
- source_name = source_name + "-" + str(os.getenv('SERVICE_NAME', ""))
+ source_name = source_name + "-" + str(os.getenv("SERVICE_NAME", ""))
connection_db = pm.postgres_db_open(user_name, password, ip_address, port_num, db_name)
cur = connection_db.cursor()
if int(current_pid) == int(hbc_pid) and source_name == hbc_srcName and hbc_state == "RUNNING":
@@ -170,9 +191,12 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
_logger.info("DBM:Waiting for hb_common state to become RUNNING")
break
- cur.execute("SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, "
- "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, "
- "target, version FROM vnf_table_1 WHERE event_name = %s", (event_name,))
+ cur.execute(
+ "SELECT validity_flag, source_name_count, heartbeat_interval, heartbeat_missed_count, "
+ "closed_control_loop_name, policy_version, policy_name, policy_scope, target_type, "
+ "target, version FROM vnf_table_1 WHERE event_name = %s",
+ (event_name,),
+ )
rows = cur.fetchall()
validity_flag = rows[0][0]
source_name_count = rows[0][1]
@@ -189,8 +213,11 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
if validity_flag == 1:
for source_name_key in range(source_name_count):
epoc_time = int(round(time.time() * 1000))
- cur.execute("SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE "
- "event_name = %s AND source_name_key = %s", (event_name, (source_name_key + 1)))
+ cur.execute(
+ "SELECT last_epo_time, source_name, cl_flag FROM vnf_table_2 WHERE "
+ "event_name = %s AND source_name_key = %s",
+ (event_name, (source_name_key + 1)),
+ )
row = cur.fetchall()
if len(row) == 0:
continue
@@ -198,20 +225,44 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
srcName = row[0][1]
cl_flag = row[0][2]
if (epoc_time - epoc_time_sec) > comparision_time and cl_flag == 0:
- sendControlLoopEvent("ONSET", pol_url, policy_version, policy_name, policy_scope,
- target_type, srcName, epoc_time, closed_control_loop_name, version,
- target)
+ sendControlLoopEvent(
+ "ONSET",
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+ )
cl_flag = 1
- cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND "
- "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
+ (cl_flag, event_name, (source_name_key + 1)),
+ )
connection_db.commit()
elif (epoc_time - epoc_time_sec) < comparision_time and cl_flag == 1:
- sendControlLoopEvent("ABATED", pol_url, policy_version, policy_name, policy_scope,
- target_type, srcName, epoc_time, closed_control_loop_name, version,
- target)
+ sendControlLoopEvent(
+ "ABATED",
+ pol_url,
+ policy_version,
+ policy_name,
+ policy_scope,
+ target_type,
+ srcName,
+ epoc_time,
+ closed_control_loop_name,
+ version,
+ target,
+ )
cl_flag = 0
- cur.execute("UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND "
- "source_name_key = %s", (cl_flag, event_name, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET CL_FLAG = %s WHERE EVENT_NAME = %s AND " "source_name_key = %s",
+ (cl_flag, event_name, (source_name_key + 1)),
+ )
connection_db.commit()
else: # pragma: no cover
@@ -233,15 +284,23 @@ def db_monitoring(current_pid, json_file, user_name, password, ip_address, port_
if __name__ == "__main__":
- get_logger.configure_logger('db_monitoring')
+ get_logger.configure_logger("db_monitoring")
_logger.info("DBM: DBM Process started")
current_pid = sys.argv[1]
jsfile = sys.argv[2]
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(jsfile)
msg = "DBM:Parent process ID and json file name", current_pid, jsfile
_logger.info(msg)
while True:
db_monitoring(current_pid, jsfile, user_name, password, ip_address, port_num, db_name)
- envPytest = os.getenv('pytest', "")
- if envPytest == 'test':
+ envPytest = os.getenv("pytest", "")
+ if envPytest == "test":
break