From 2108563705a2ec8bb80029d36122c69fa4d06df5 Mon Sep 17 00:00:00 2001 From: "Hansen, Tony (th1395)" Date: Wed, 1 Dec 2021 22:01:56 +0000 Subject: run the black formatting tool on python code also fix up some copyright & license block lines Change-Id: Ifb628e2ef1e5f13fed0a29964eec387d3982d605 Signed-off-by: Hansen, Tony (th1395) Issue-ID: DCAEGEN2-2995 Signed-off-by: Hansen, Tony (th1395) --- miss_htbt_service/htbtworker.py | 114 ++++++++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 45 deletions(-) (limited to 'miss_htbt_service/htbtworker.py') diff --git a/miss_htbt_service/htbtworker.py b/miss_htbt_service/htbtworker.py index be1b6aa..44436a2 100644 --- a/miss_htbt_service/htbtworker.py +++ b/miss_htbt_service/htbtworker.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # ============LICENSE_START======================================================= -# Copyright 2018-2020 AT&T Intellectual Property, Inc. All rights reserved. +# Copyright (c) 2018-2021 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. -# Copyright 2020 Deutsche Telekom. All rights reserved. -# Copyright 2021 Fujitsu Ltd. +# Copyright (c) 2020 Deutsche Telekom. All rights reserved. +# Copyright (c) 2021 Fujitsu Ltd. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ def read_json_file(i, prefix="../../tests"): with open(path.abspath(path.join(__file__, f"{prefix}/test2.json")), "r") as outfile: cfg = json.load(outfile) elif i == 2: - with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), 'r') as outfile: + with open(path.abspath(path.join(__file__, f"{prefix}/test3.json")), "r") as outfile: cfg = json.load(outfile) return cfg @@ -56,19 +56,21 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): sleep_duration = 20 while True: time.sleep(sleep_duration) - with open(jsfile, 'r') as outfile: + with open(jsfile, "r") as outfile: cfg = json.load(outfile) - mr_url = str(cfg['streams_subscribes']['ves-heartbeat']['dmaap_info']['topic_url']) + mr_url = str(cfg["streams_subscribes"]["ves-heartbeat"]["dmaap_info"]["topic_url"]) while True: - hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common(user_name, password, ip_address, port_num, db_name) + hbc_pid, hbc_state, hbc_srcName, hbc_time = db.read_hb_common( + user_name, password, ip_address, port_num, db_name + ) if hbc_state == "RECONFIGURATION": _logger.info("HBT:Waiting for hb_common state to become RUNNING") time.sleep(10) else: break - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": eventnameList = ["Heartbeat_vDNS", "Heartbeat_vFW", "Heartbeat_xx"] connection_db = 0 else: @@ -79,13 +81,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): msg = "\n\nHBT:eventnameList values ", eventnameList _logger.info(msg) if "groupID" not in os.environ or "consumerID" not in os.environ: - get_url = mr_url + '/DefaultGroup/1?timeout=15000' + get_url = mr_url + "/DefaultGroup/1?timeout=15000" else: - get_url = mr_url + '/' + os.getenv('groupID', "") + '/' + os.getenv('consumerID', "") + '?timeout=15000' + get_url = mr_url + "/" + os.getenv("groupID", "") + "/" + os.getenv("consumerID", "") + "?timeout=15000" msg = "HBT:Getting :" + get_url _logger.info(msg) - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": jsonobj = read_json_file(i) jobj = [] jobj.append(jsonobj) @@ -103,14 +105,14 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): # If mrstatus in message body indicates some information, not json msg. if "mrstatus" in inputString: continue - jlist = inputString.split('\n') + jlist = inputString.split("\n") # Process the DMaaP input message retreived error = False for line in jlist: try: jobj = json.loads(line) except ValueError: - msg = 'HBT:Decoding JSON has failed' + msg = "HBT:Decoding JSON has failed" _logger.error(msg) error = True break @@ -120,17 +122,17 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): continue for item in jobj: try: - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": jitem = jsonobj else: jitem = json.loads(item) - srcname = (jitem['event']['commonEventHeader']['sourceName']) - lastepo = (jitem['event']['commonEventHeader']['lastEpochMicrosec']) + srcname = jitem["event"]["commonEventHeader"]["sourceName"] + lastepo = jitem["event"]["commonEventHeader"]["lastEpochMicrosec"] # if lastEpochMicrosec looks like microsec, align it with millisec if lastepo > 1000000000000000: lastepo = int(lastepo / 1000) - seqnum = (jitem['event']['commonEventHeader']['sequence']) - eventName = (jitem['event']['commonEventHeader']['eventName']) + seqnum = jitem["event"]["commonEventHeader"]["sequence"] + eventName = jitem["event"]["commonEventHeader"]["eventName"] except Exception as err: msg = "HBT message process error - ", err _logger.error(msg) @@ -140,7 +142,8 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if db_table_creation_check(connection_db, "vnf_table_2") is False: msg = "HBT:Creating vnf_table_2" _logger.info(msg) - cur.execute(""" + cur.execute( + """ CREATE TABLE vnf_table_2 ( EVENT_NAME varchar, SOURCE_NAME_KEY integer, @@ -148,12 +151,13 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): LAST_EPO_TIME BIGINT, SOURCE_NAME varchar, CL_FLAG integer - )""") + )""" + ) else: msg = "HBT:vnf_table_2 is already there" _logger.info(msg) if eventName in eventnameList: # pragma: no cover - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": break cur.execute("SELECT source_name_count FROM vnf_table_1 WHERE event_name = %s", (eventName,)) row = cur.fetchone() @@ -162,16 +166,22 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): cl_flag = 0 if source_name_count == 0: # pragma: no cover _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (eventName, source_name_key, lastepo, srcname, cl_flag)) - cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", - (source_name_key, eventName)) + cur.execute( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + (eventName, source_name_key, lastepo, srcname, cl_flag), + ) + cur.execute( + "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s where EVENT_NAME = %s", + (source_name_key, eventName), + ) else: # pragma: no cover msg = "HBT:event name, source_name & source_name_count are", eventName, srcname, source_name_count _logger.info(msg) for source_name_key in range(source_name_count): - cur.execute("SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " - "source_name_key = %s", (eventName, (source_name_key + 1))) + cur.execute( + "SELECT source_name FROM vnf_table_2 WHERE event_name = %s AND " "source_name_key = %s", + (eventName, (source_name_key + 1)), + ) row = cur.fetchall() if len(row) == 0: continue @@ -179,9 +189,11 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if db_srcname == srcname: msg = "HBT: Update vnf_table_2 : ", source_name_key, row _logger.info(msg) - cur.execute("UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " - "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", - (lastepo, srcname, eventName, (source_name_key + 1))) + cur.execute( + "UPDATE vnf_table_2 SET LAST_EPO_TIME = %s, SOURCE_NAME = %s " + "WHERE EVENT_NAME = %s AND SOURCE_NAME_KEY = %s", + (lastepo, srcname, eventName, (source_name_key + 1)), + ) source_name_key = source_name_count break else: @@ -191,27 +203,31 @@ def process_msg(jsfile, user_name, password, ip_address, port_num, db_name): if source_name_count == (source_name_key + 1): source_name_key = source_name_count + 1 _logger.info("HBT: Insert entry into vnf_table_2, source_name='%s'", srcname) - cur.execute("INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", - (eventName, source_name_key, lastepo, srcname, cl_flag)) - cur.execute("UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", - (source_name_key, eventName)) + cur.execute( + "INSERT INTO vnf_table_2 VALUES(%s,%s,%s,%s,%s)", + (eventName, source_name_key, lastepo, srcname, cl_flag), + ) + cur.execute( + "UPDATE vnf_table_1 SET SOURCE_NAME_COUNT = %s WHERE EVENT_NAME = %s", + (source_name_key, eventName), + ) else: _logger.info("HBT:eventName is not being monitored, Igonoring JSON message") commit_db(connection_db) commit_and_close_db(connection_db) - if os.getenv('pytest', "") != 'test': + if os.getenv("pytest", "") != "test": cur.close() def postgres_db_open(username, password, host, port, database_name): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True connection = psycopg2.connect(database=database_name, user=username, password=password, host=host, port=port) return connection def db_table_creation_check(connection_db, table_name): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: cur = connection_db.cursor() @@ -223,43 +239,51 @@ def db_table_creation_check(connection_db, table_name): else: return False except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) finally: cur.close() def commit_db(connection_db): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: connection_db.commit() # <--- makes sure the change is shown in the database return True except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) return False def commit_and_close_db(connection_db): - if os.getenv('pytest', "") == 'test': + if os.getenv("pytest", "") == "test": return True try: connection_db.commit() # <--- makes sure the change is shown in the database connection_db.close() return True except psycopg2.DatabaseError as e: - msg = 'COMMON:Error %s' % e + msg = "COMMON:Error %s" % e _logger.error(msg) return False -if __name__ == '__main__': - get_logger.configure_logger('htbtworker') +if __name__ == "__main__": + get_logger.configure_logger("htbtworker") jsfile = sys.argv[1] msg = "HBT:HeartBeat thread Created" _logger.info("HBT:HeartBeat thread Created") msg = "HBT:The config file name passed is -%s", jsfile _logger.info(msg) - ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval = db.read_hb_properties(jsfile) + ( + ip_address, + port_num, + user_name, + password, + db_name, + cbs_polling_required, + cbs_polling_interval, + ) = db.read_hb_properties(jsfile) process_msg(jsfile, user_name, password, ip_address, port_num, db_name) -- cgit 1.2.3-korg