aboutsummaryrefslogtreecommitdiffstats
path: root/services/database/db_user.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/database/db_user.py')
-rw-r--r--services/database/db_user.py238
1 files changed, 168 insertions, 70 deletions
diff --git a/services/database/db_user.py b/services/database/db_user.py
index d347dd2..10d02ff 100644
--- a/services/database/db_user.py
+++ b/services/database/db_user.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -52,6 +52,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class DBUser:
@staticmethod
@@ -69,24 +70,39 @@ class DBUser:
return activationUrl
@staticmethod
- def get_contact_signup_url(invite_token, uuid, email, fullName, phoneNum, companyName):
+ def get_contact_signup_url(
+ invite_token,
+ uuid,
+ email,
+ fullName,
+ phoneNum,
+ companyName):
companyId = DBGeneral.select_where(
"uuid", "ice_vendor", "name", companyName, 1)
- signUpURLforContact = settings.ICE_PORTAL_URL + "#/signUp?invitation=" + invite_token + \
+ signUpURLforContact = settings.ICE_PORTAL_URL + \
+ "#/signUp?invitation=" + invite_token + \
"&email=" + email + "&full_name=" + fullName + \
"&phone_number=" + phoneNum + "&company=" + companyId
logger.debug("SignUpURLforContact :" + signUpURLforContact)
return signUpURLforContact
@staticmethod
- def select_invitation_token(queryColumnName, queryTableName, whereParametrType, whereParametrValue, email, fetchNum):
+ def select_invitation_token(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ email,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and email = '%s' ;" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue, email)
+ queryStr = \
+ "select %s from %s Where %s = '%s' and email = '%s' ;" % (
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, email)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -98,14 +114,14 @@ class DBUser:
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
- if result == None:
+ if result is None:
errorMsg = "select_where_pr_state FAILED "
logger.error(errorMsg)
raise
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@@ -115,27 +131,30 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadFullName = DBGeneral.select_where_and(
- "full_name", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ "full_name", "ice_user_profile", "id", reviewer_id,
+ "role_id", "2", 1)
return engLeadFullName
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "get_el_name FAILED "
raise Exception(errorMsg, "get_el_name")
@staticmethod
def get_email_by_full_name(fullname):
# try:
- query_str = "select email from ice_user_profile where full_name = '%s';" % (
- fullname)
+ query_str = "select email from ice_user_profile where " +\
+ "full_name = '%s';" % (fullname)
user_email = DBGeneral.select_query(query_str)
return user_email
-# except: # If failed - count the failure and add the error to list of errors.
-# errorMsg = "get_email_by_full_name FAILED "
-# raise Exception(errorMsg, "get_el_name")
@staticmethod
def select_recent_vf_of_user(user_uuid, fetchNum):
@@ -144,8 +163,9 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "SELECT vf_id FROM public.ice_recent_engagement where user_uuid = '%s' order by last_update desc limit 20;" % (
- user_uuid)
+ queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\
+ "where user_uuid = '%s' order by last_update " % user_uuid +\
+ "desc limit 20;"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -160,7 +180,7 @@ class DBUser:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@@ -170,14 +190,20 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadEmail = DBGeneral.select_where_and(
- "email", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ "email", "ice_user_profile", "id", reviewer_id, "role_id",
+ "2", 1)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_el_email FAILED "
raise Exception(errorMsg, "select_el_email")
@@ -188,14 +214,15 @@ class DBUser:
engLeadId = DBUser.select_user_profile_property(email, "id")
return engLeadId
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_native_id FAILED "
raise Exception(errorMsg, "select_user_native_id")
@staticmethod
def select_personal_next_step(email):
user_id = DBUser.select_user_native_id(email)
- return DBGeneral.select_where("uuid", "ice_next_step", "owner_id", user_id, 1)
+ return DBGeneral.select_where(
+ "uuid", "ice_next_step", "owner_id", user_id, 1)
@staticmethod
def select_pr_email(vfName):
@@ -203,14 +230,19 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "peer_reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "peer_reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadEmail = DBGeneral.select_where(
"email", "ice_user_profile", "id", reviewer_id, 1)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_el_email FAILED "
raise Exception(errorMsg, "select_el_email")
@@ -223,16 +255,30 @@ class DBUser:
return notifIDs
@staticmethod
- def get_not_seen_notifications_number_by_email(user_email, is_negative=False):
+ def get_not_seen_notifications_number_by_email(
+ user_email, is_negative=False):
user_id = DBGeneral.select_where_email(
"id", Constants.DBConstants.IceTables.USER_PROFILE, user_email)
notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ Constants.DBConstants.Queries.COUNT,
+ Constants.DBConstants.IceTables.NOTIFICATION,
+ "user_id",
+ user_id,
+ "is_read",
+ "False",
+ 1)
if is_negative:
counter = 0
- while notifications_number != "0" and counter <= Constants.Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
+ while notifications_number != "0" and counter <= Constants.\
+ Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ Constants.DBConstants.Queries.COUNT,
+ Constants.DBConstants.IceTables.NOTIFICATION,
+ "user_id",
+ user_id,
+ "is_read",
+ "False",
+ 1)
time.sleep(1)
counter += 1
return notifications_number
@@ -240,9 +286,14 @@ class DBUser:
@staticmethod
def get_eng_lead_email_per_enguuid(enguuid):
reviewer_id = DBGeneral.select_where(
- "reviewer_id", Constants.DBConstants.IceTables.ENGAGEMENT, "uuid", enguuid, 1)
+ "reviewer_id",
+ Constants.DBConstants.IceTables.ENGAGEMENT,
+ "uuid",
+ enguuid,
+ 1)
engLeadEmail = DBGeneral.select_where(
- "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", reviewer_id, 1)
+ "email", Constants.DBConstants.IceTables.USER_PROFILE, "id",
+ reviewer_id, 1)
return engLeadEmail
@staticmethod
@@ -252,8 +303,12 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select COUNT(*) from ice_engagement_engagement_team Where iceuserprofile_id = %s and (select engagement_stage from public.ice_engagement where uuid = engagement_id LIMIT 1) != 'Archived';" % (
- engLeadID)
+ queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\
+ " Where iceuserprofile_id = %s" % engLeadID +\
+ " and (select " +\
+ "engagement_stage from public.ice_engagement " +\
+ "where uuid = engagement_id LIMIT 1) != 'Archived';"
+
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = cur.fetchall()
@@ -262,7 +317,7 @@ class DBUser:
logger.debug(result[0][0])
return result[0][0]
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_engagements_by_stage FAILED "
raise Exception(errorMsg, "select_user_engagements_by_stage")
@@ -273,8 +328,14 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select count(*) from ice_engagement INNER JOIN ice_engagement_engagement_team ON ice_engagement_engagement_team.engagement_id= ice_engagement.uuid Where (ice_engagement.engagement_stage = '%s') and (ice_engagement_engagement_team.iceuserprofile_id = %s );" % (
- stage, engLeadID)
+ queryStr = "select count(*) from ice_engagement INNER JOIN " +\
+ "ice_engagement_engagement_team ON " +\
+ "ice_engagement_engagement_team.engagement_id= " +\
+ "ice_engagement.uuid Where " +\
+ "(ice_engagement.engagement_stage " +\
+ "= '%s') and " % stage +\
+ "(ice_engagement_engagement_team.iceuserprofile_id = " +\
+ "%s );" % engLeadID
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = cur.fetchall()
@@ -283,7 +344,7 @@ class DBUser:
logger.debug(result[0][0])
return result[0][0]
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_engagements_by_stage FAILED "
raise Exception(errorMsg, "select_user_engagements_by_stage")
@@ -294,7 +355,11 @@ class DBUser:
# Fetch one user ID.
index = DBGeneral.select_where_email("id", "auth_user", email)
DBGeneral.update_where(
- "ice_custom_user", "temp_password", encodePass, "user_ptr_id", index)
+ "ice_custom_user",
+ "temp_password",
+ encodePass,
+ "user_ptr_id",
+ index)
@staticmethod
def set_password_to_default(email):
@@ -305,8 +370,9 @@ class DBUser:
@staticmethod
def select_el_not_in_engagement(el_name, pr_name):
- query_str = "select full_name from ice_user_profile where role_id = 2 and full_name != '%s' and full_name != '%s';" % (
- el_name, pr_name)
+ query_str = "select full_name from ice_user_profile where " +\
+ "role_id = 2 and full_name != '%s' and full_name != '%s';" % (
+ el_name, pr_name)
new_user = DBGeneral.select_query(query_str)
if new_user == 'None':
new_user = DBUser.update_to_el_not_in_engagement()
@@ -316,53 +382,65 @@ class DBUser:
def select_user_uuid(email):
user_uuid = DBUser.select_user_profile_property(email, "uuid")
return user_uuid
-
+
@staticmethod
def select_access_key(email):
- access_key = DBUser.select_user_profile_property(email, "rgwa_access_key")
+ access_key = DBUser.select_user_profile_property(
+ email, "rgwa_access_key")
return access_key
-
+
@staticmethod
def select_secret_key(email):
- secret_key = DBUser.select_user_profile_property(email, "rgwa_secret_key")
+ secret_key = DBUser.select_user_profile_property(
+ email, "rgwa_secret_key")
return secret_key
-
+
@staticmethod
def update_to_el_not_in_engagement():
query_str = "select uuid from ice_user_profile where role_id = 1 ;"
user_uuid = DBGeneral.select_query(query_str)
- updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name = 'el_for_test' WHERE uuid = '%s' ;" % (
- user_uuid)
+ updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\
+ " = 'el_for_test' WHERE uuid = '%s' ;" % (
+ user_uuid)
DBGeneral.update_query(updatequery)
- updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE full_name = '%s' ;" % (
- 'el_for_test')
+ updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
+ "full_name = '%s' ;" % (
+ 'el_for_test')
DBGeneral.update_query(updatequery)
return 'el_for_test'
@staticmethod
def rollback_for_el_not_in_engagement():
- query_str = "select uuid from ice_user_profile where full_name = 'el_for_test';"
+ query_str = "select uuid from ice_user_profile where full_name = " +\
+ "'el_for_test';"
user_uuid = DBGeneral.select_query(query_str)
fullName = DBBridge.helper_rand_string("randomString")
- updatequery = "UPDATE ice_user_profile SET role_id=1,full_name = '%s' WHERE uuid = '%s' ;" % (
- fullName, user_uuid)
+ updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\
+ "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid)
DBGeneral.update_query(updatequery)
@staticmethod
def set_engagement_peer_reviewer(engagement_uuid, email):
user_uuid = DBUser.select_user_uuid(email)
- update_query = "UPDATE ice_user_profile SET role_id=2 WHERE uuid = '%s';" % user_uuid
+ update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
+ "uuid = '%s';" % user_uuid
DBGeneral.update_query(update_query)
user_id = DBGeneral.select_query(
"SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid)
- update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s WHERE uuid = '%s';" % (
- user_id, engagement_uuid)
+ update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\
+ "WHERE uuid = '%s';" % (
+ user_id, engagement_uuid)
DBGeneral.update_query(update_query)
@staticmethod
def select_user_profile_property(user_email, property_name):
- return DBGeneral.select_where(property_name, "ice_user_profile", "email", user_email, 1)
+ return DBGeneral.select_where(
+ property_name,
+ "ice_user_profile",
+ "email",
+ user_email,
+ 1)
@staticmethod
def validate_user_profile_settings_in_db(user_email, checked):
@@ -391,13 +469,23 @@ class DBUser:
def get_access_key(user_uuid):
counter = 0
access_key = DBGeneral.select_where(
- "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
- while access_key == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ "rgwa_access_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+ while access_key == "None" and counter <= \
+ Constants.RGWAConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
logger.debug(
- "rgwa_access_key are not ready yet, trying again (%s of 20)" % counter)
+ "rgwa_access_key are not ready yet, trying again (%s of 20)" %
+ counter)
access_key = DBGeneral.select_where(
- "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ "rgwa_access_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
counter += 1
return access_key
@@ -405,13 +493,23 @@ class DBUser:
def get_access_secret(user_uuid):
counter = 0
access_secret = DBGeneral.select_where(
- "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
- while access_secret == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ "rgwa_secret_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+ while access_secret == "None" and counter <= Constants.\
+ RGWAConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
logger.debug(
- "rgwa_secret_key are not ready yet, trying again (%s of 100)" % counter)
+ "rgwa_secret_key are not ready yet, trying again (%s of 100)" %
+ counter)
access_secret = DBGeneral.select_where(
- "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
-
+ "rgwa_secret_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+
counter += 1
return access_secret