aboutsummaryrefslogtreecommitdiffstats
path: root/miss_htbt_service/htbtworker.py
diff options
context:
space:
mode:
Diffstat (limited to 'miss_htbt_service/htbtworker.py')
-rw-r--r--miss_htbt_service/htbtworker.py114
1 files changed, 69 insertions, 45 deletions
diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py
index be1b6aa..44436a2 100644
--- a/miss_htbt_service/htbtworker.py
+++ b/miss_htbt_service/htbtworker.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
# ============LICENSE_START=======================================================
-# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved.
+# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# Copyright 2020 Deutsche Telekom. All rights reserved.
-# Copyright 2021 Fujitsu Ltd.
+# Copyright (c) 2020 Deutsche Telekom. All rights reserved.
+# Copyright (c) 2021 Fujitsu Ltd.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -45,7 +45,7 @@ def read_json_file(i, prefix="../../tests"):
with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile:
cfg = json.load(outfile)
elif i == 2:
- with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile:
+ with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile:
cfg = json.load(outfile)
return cfg
@@ -56,19 +56,21 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
sleep_duration = 20
while True:
time.sleep(sleep_duration)
- with open(jsfile, 'r') as outfile:
+ with open(jsfile, "r") as outfile:
cfg = json.load(outfile)
- mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url'])
+ mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"])
while True:
- hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name)
+ hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(
+ user_name, password, ip_address, port_num, db_name
+ )
if hbc_state == "RECONFIGURATION":
_logger.info("HBT:Waiting for hb_common state to become RUNNING")
time.sleep(10)
else:
break
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"]
connection_db = 0
else:
@@ -79,13 +81,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
msg = "\n\nHBT:eventnameList values ", eventnameList
_logger.info(msg)
if "groupID" not in os.environ or "consumerID" not in os.environ:
- get_url = mr_url + '/DefaultGroup/1?timeout=15000'
+ get_url = mr_url + "/DefaultGroup/1?timeout=15000"
else:
- get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000'
+ get_url = mr_url + "/" + os.getenv("groupID", "") + "/" + os.getenv("consumerID", "") + "?timeout=15000"
msg = "HBT:Getting :" + get_url
_logger.info(msg)
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
jsonobj = read_json_file(i)
jobj = []
jobj.append(jsonobj)
@@ -103,14 +105,14 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
# If mrstatus in message body indicates some information, not json msg.
if "mrstatus" in inputString:
continue
- jlist = inputString.split('\n')
+ jlist = inputString.split("\n")
# Process the DMaaP input message retreived
error = False
for line in jlist:
try:
jobj = json.loads(line)
except ValueError:
- msg = 'HBT:Decoding JSON has failed'
+ msg = "HBT:Decoding JSON has failed"
_logger.error(msg)
error = True
break
@@ -120,17 +122,17 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
continue
for item in jobj:
try:
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
jitem = jsonobj
else:
jitem = json.loads(item)
- srcname = (jitem['event']['commonEventHeader']['sourceName'])
- lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec'])
+ srcname = jitem["event"]["commonEventHeader"]["sourceName"]
+ lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"]
# if lastEpochMicrosec looks like microsec, align it with millisec
if lastepo > 1000000000000000:
lastepo = int(lastepo / 1000)
- seqnum = (jitem['event']['commonEventHeader']['sequence'])
- eventName = (jitem['event']['commonEventHeader']['eventName'])
+ seqnum = jitem["event"]["commonEventHeader"]["sequence"]
+ eventName = jitem["event"]["commonEventHeader"]["eventName"]
except Exception as err:
msg = "HBT message process error - ", err
_logger.error(msg)
@@ -140,7 +142,8 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if db_table_creation_check(connection_db, "vnf_table_2") is False:
msg = "HBT:Creating vnf_table_2"
_logger.info(msg)
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE vnf_table_2 (
EVENT_NAME varchar,
SOURCE_NAME_KEY integer,
@@ -148,12 +151,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
LAST_EPO_TIME BIGINT,
SOURCE_NAME varchar,
CL_FLAG integer
- )""")
+ )"""
+ )
else:
msg = "HBT:vnf_table_2 is already there"
_logger.info(msg)
if eventName in eventnameList: # pragma: no cover
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
break
cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (eventName,))
row = cur.fetchone()
@@ -162,16 +166,22 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
cl_flag = 0
if source_name_count == 0: # pragma: no cover
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (eventName, source_name_key, lastepo, srcname, cl_flag))
- cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s",
- (source_name_key, eventName))
+ cur.execute(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
+ (eventName, source_name_key, lastepo, srcname, cl_flag),
+ )
+ cur.execute(
+ "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s",
+ (source_name_key, eventName),
+ )
else: # pragma: no cover
msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count
_logger.info(msg)
for source_name_key in range(source_name_count):
- cur.execute("SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND "
- "source_name_key = %s", (eventName, (source_name_key + 1)))
+ cur.execute(
+ "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s",
+ (eventName, (source_name_key + 1)),
+ )
row = cur.fetchall()
if len(row) == 0:
continue
@@ -179,9 +189,11 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if db_srcname == srcname:
msg = "HBT: Update vnf_table_2 : ", source_name_key, row
_logger.info(msg)
- cur.execute("UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
- "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
- (lastepo, srcname, eventName, (source_name_key + 1)))
+ cur.execute(
+ "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s "
+ "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s",
+ (lastepo, srcname, eventName, (source_name_key + 1)),
+ )
source_name_key = source_name_count
break
else:
@@ -191,27 +203,31 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name):
if source_name_count == (source_name_key + 1):
source_name_key = source_name_count + 1
_logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname)
- cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
- (eventName, source_name_key, lastepo, srcname, cl_flag))
- cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
- (source_name_key, eventName))
+ cur.execute(
+ "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)",
+ (eventName, source_name_key, lastepo, srcname, cl_flag),
+ )
+ cur.execute(
+ "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s",
+ (source_name_key, eventName),
+ )
else:
_logger.info("HBT:eventName is not being monitored, Igonoring JSON message")
commit_db(connection_db)
commit_and_close_db(connection_db)
- if os.getenv('pytest', "") != 'test':
+ if os.getenv("pytest", "") != "test":
cur.close()
def postgres_db_open(username, password, host, port, database_name):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port)
return connection
def db_table_creation_check(connection_db, table_name):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
cur = connection_db.cursor()
@@ -223,43 +239,51 @@ def db_table_creation_check(connection_db, table_name):
else:
return False
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
finally:
cur.close()
def commit_db(connection_db):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
connection_db.commit() # <--- makes sure the change is shown in the database
return True
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
def commit_and_close_db(connection_db):
- if os.getenv('pytest', "") == 'test':
+ if os.getenv("pytest", "") == "test":
return True
try:
connection_db.commit() # <--- makes sure the change is shown in the database
connection_db.close()
return True
except psycopg2.DatabaseError as e:
- msg = 'COMMON:Error %s' % e
+ msg = "COMMON:Error %s" % e
_logger.error(msg)
return False
-if __name__ == '__main__':
- get_logger.configure_logger('htbtworker')
+if __name__ == "__main__":
+ get_logger.configure_logger("htbtworker")
jsfile = sys.argv[1]
msg = "HBT:HeartBeat thread Created"
_logger.info("HBT:HeartBeat thread Created")
msg = "HBT:The config file name passed is -%s", jsfile
_logger.info(msg)
- ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile)
+ (
+ ip_address,
+ port_num,
+ user_name,
+ password,
+ db_name,
+ cbs_polling_required,
+ cbs_polling_interval,
+ ) = db.read_hb_properties(jsfile)
process_msg(jsfile, user_name, password, ip_address, port_num, db_name)