diff options
Diffstat (limited to 'services/database/db_user.py')
-rw-r--r-- | services/database/db_user.py | 238 |
1 files changed, 168 insertions, 70 deletions
diff --git a/services/database/db_user.py b/services/database/db_user.py index d347dd2..10d02ff 100644 --- a/services/database/db_user.py +++ b/services/database/db_user.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -52,6 +52,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class DBUser: @staticmethod @@ -69,24 +70,39 @@ class DBUser: return activationUrl @staticmethod - def get_contact_signup_url(invite_token, uuid, email, fullName, phoneNum, companyName): + def get_contact_signup_url( + invite_token, + uuid, + email, + fullName, + phoneNum, + companyName): companyId = DBGeneral.select_where( "uuid", "ice_vendor", "name", companyName, 1) - signUpURLforContact = settings.ICE_PORTAL_URL + "#/signUp?invitation=" + invite_token + \ + signUpURLforContact = settings.ICE_PORTAL_URL + \ + "#/signUp?invitation=" + invite_token + \ "&email=" + email + "&full_name=" + fullName + \ "&phone_number=" + phoneNum + "&company=" + companyId logger.debug("SignUpURLforContact :" + signUpURLforContact) return signUpURLforContact @staticmethod - def select_invitation_token(queryColumnName, queryTableName, whereParametrType, whereParametrValue, email, fetchNum): + def select_invitation_token( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + email, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' and email = '%s' ;" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue, email) + queryStr = \ + "select %s from %s Where %s = '%s' and email = '%s' ;" % ( + queryColumnName, queryTableName, whereParametrType, + whereParametrValue, email) logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -98,14 +114,14 @@ class DBUser: elif(result.find(",)") != -1): # formatting ints e.g id result = result.partition('(')[-1].rpartition(',')[0] dbConn.close() - if result == None: + if result is None: errorMsg = "select_where_pr_state FAILED " logger.error(errorMsg) raise logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where FAILED " raise Exception(errorMsg, "select_where") @@ -115,27 +131,30 @@ class DBUser: # Fetch one AT&T user ID. engagement_id = DBVirtualFunction.select_eng_uuid(vfName) engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) reviewer_id = DBGeneral.select_where( - "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + "reviewer_id", + "ice_engagement", + "engagement_manual_id", + engagement_manual_id, + 1) engLeadFullName = DBGeneral.select_where_and( - "full_name", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1) + "full_name", "ice_user_profile", "id", reviewer_id, + "role_id", "2", 1) return engLeadFullName # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "get_el_name FAILED " raise Exception(errorMsg, "get_el_name") @staticmethod def get_email_by_full_name(fullname): # try: - query_str = "select email from ice_user_profile where full_name = '%s';" % ( - fullname) + query_str = "select email from ice_user_profile where " +\ + "full_name = '%s';" % (fullname) user_email = DBGeneral.select_query(query_str) return user_email -# except: # If failed - count the failure and add the error to list of errors. -# errorMsg = "get_email_by_full_name FAILED " -# raise Exception(errorMsg, "get_el_name") @staticmethod def select_recent_vf_of_user(user_uuid, fetchNum): @@ -144,8 +163,9 @@ class DBUser: DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "SELECT vf_id FROM public.ice_recent_engagement where user_uuid = '%s' order by last_update desc limit 20;" % ( - user_uuid) + queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\ + "where user_uuid = '%s' order by last_update " % user_uuid +\ + "desc limit 20;" logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -160,7 +180,7 @@ class DBUser: logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where FAILED " raise Exception(errorMsg, "select_where") @@ -170,14 +190,20 @@ class DBUser: # Fetch one AT&T user ID. engagement_id = DBVirtualFunction.select_eng_uuid(vfName) engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) reviewer_id = DBGeneral.select_where( - "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + "reviewer_id", + "ice_engagement", + "engagement_manual_id", + engagement_manual_id, + 1) engLeadEmail = DBGeneral.select_where_and( - "email", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1) + "email", "ice_user_profile", "id", reviewer_id, "role_id", + "2", 1) return engLeadEmail # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_el_email FAILED " raise Exception(errorMsg, "select_el_email") @@ -188,14 +214,15 @@ class DBUser: engLeadId = DBUser.select_user_profile_property(email, "id") return engLeadId # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_user_native_id FAILED " raise Exception(errorMsg, "select_user_native_id") @staticmethod def select_personal_next_step(email): user_id = DBUser.select_user_native_id(email) - return DBGeneral.select_where("uuid", "ice_next_step", "owner_id", user_id, 1) + return DBGeneral.select_where( + "uuid", "ice_next_step", "owner_id", user_id, 1) @staticmethod def select_pr_email(vfName): @@ -203,14 +230,19 @@ class DBUser: # Fetch one AT&T user ID. engagement_id = DBVirtualFunction.select_eng_uuid(vfName) engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) reviewer_id = DBGeneral.select_where( - "peer_reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + "peer_reviewer_id", + "ice_engagement", + "engagement_manual_id", + engagement_manual_id, + 1) engLeadEmail = DBGeneral.select_where( "email", "ice_user_profile", "id", reviewer_id, 1) return engLeadEmail # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_el_email FAILED " raise Exception(errorMsg, "select_el_email") @@ -223,16 +255,30 @@ class DBUser: return notifIDs @staticmethod - def get_not_seen_notifications_number_by_email(user_email, is_negative=False): + def get_not_seen_notifications_number_by_email( + user_email, is_negative=False): user_id = DBGeneral.select_where_email( "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email) notifications_number = DBGeneral.select_where_and( - Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1) + Constants.DBConstants.Queries.COUNT, + Constants.DBConstants.IceTables.NOTIFICATION, + "user_id", + user_id, + "is_read", + "False", + 1) if is_negative: counter = 0 - while notifications_number != "0" and counter <= Constants.Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER: + while notifications_number != "0" and counter <= Constants.\ + Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER: notifications_number = DBGeneral.select_where_and( - Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1) + Constants.DBConstants.Queries.COUNT, + Constants.DBConstants.IceTables.NOTIFICATION, + "user_id", + user_id, + "is_read", + "False", + 1) time.sleep(1) counter += 1 return notifications_number @@ -240,9 +286,14 @@ class DBUser: @staticmethod def get_eng_lead_email_per_enguuid(enguuid): reviewer_id = DBGeneral.select_where( - "reviewer_id", Constants.DBConstants.IceTables.ENGAGEMENT, "uuid", enguuid, 1) + "reviewer_id", + Constants.DBConstants.IceTables.ENGAGEMENT, + "uuid", + enguuid, + 1) engLeadEmail = DBGeneral.select_where( - "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", reviewer_id, 1) + "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", + reviewer_id, 1) return engLeadEmail @staticmethod @@ -252,8 +303,12 @@ class DBUser: DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select COUNT(*) from ice_engagement_engagement_team Where iceuserprofile_id = %s and (select engagement_stage from public.ice_engagement where uuid = engagement_id LIMIT 1) != 'Archived';" % ( - engLeadID) + queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\ + " Where iceuserprofile_id = %s" % engLeadID +\ + " and (select " +\ + "engagement_stage from public.ice_engagement " +\ + "where uuid = engagement_id LIMIT 1) != 'Archived';" + logger.debug("Query : " + queryStr) cur.execute(queryStr) result = cur.fetchall() @@ -262,7 +317,7 @@ class DBUser: logger.debug(result[0][0]) return result[0][0] # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_user_engagements_by_stage FAILED " raise Exception(errorMsg, "select_user_engagements_by_stage") @@ -273,8 +328,14 @@ class DBUser: DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select count(*) from ice_engagement INNER JOIN ice_engagement_engagement_team ON ice_engagement_engagement_team.engagement_id= ice_engagement.uuid Where (ice_engagement.engagement_stage = '%s') and (ice_engagement_engagement_team.iceuserprofile_id = %s );" % ( - stage, engLeadID) + queryStr = "select count(*) from ice_engagement INNER JOIN " +\ + "ice_engagement_engagement_team ON " +\ + "ice_engagement_engagement_team.engagement_id= " +\ + "ice_engagement.uuid Where " +\ + "(ice_engagement.engagement_stage " +\ + "= '%s') and " % stage +\ + "(ice_engagement_engagement_team.iceuserprofile_id = " +\ + "%s );" % engLeadID logger.debug("Query : " + queryStr) cur.execute(queryStr) result = cur.fetchall() @@ -283,7 +344,7 @@ class DBUser: logger.debug(result[0][0]) return result[0][0] # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_user_engagements_by_stage FAILED " raise Exception(errorMsg, "select_user_engagements_by_stage") @@ -294,7 +355,11 @@ class DBUser: # Fetch one user ID. index = DBGeneral.select_where_email("id", "auth_user", email) DBGeneral.update_where( - "ice_custom_user", "temp_password", encodePass, "user_ptr_id", index) + "ice_custom_user", + "temp_password", + encodePass, + "user_ptr_id", + index) @staticmethod def set_password_to_default(email): @@ -305,8 +370,9 @@ class DBUser: @staticmethod def select_el_not_in_engagement(el_name, pr_name): - query_str = "select full_name from ice_user_profile where role_id = 2 and full_name != '%s' and full_name != '%s';" % ( - el_name, pr_name) + query_str = "select full_name from ice_user_profile where " +\ + "role_id = 2 and full_name != '%s' and full_name != '%s';" % ( + el_name, pr_name) new_user = DBGeneral.select_query(query_str) if new_user == 'None': new_user = DBUser.update_to_el_not_in_engagement() @@ -316,53 +382,65 @@ class DBUser: def select_user_uuid(email): user_uuid = DBUser.select_user_profile_property(email, "uuid") return user_uuid - + @staticmethod def select_access_key(email): - access_key = DBUser.select_user_profile_property(email, "rgwa_access_key") + access_key = DBUser.select_user_profile_property( + email, "rgwa_access_key") return access_key - + @staticmethod def select_secret_key(email): - secret_key = DBUser.select_user_profile_property(email, "rgwa_secret_key") + secret_key = DBUser.select_user_profile_property( + email, "rgwa_secret_key") return secret_key - + @staticmethod def update_to_el_not_in_engagement(): query_str = "select uuid from ice_user_profile where role_id = 1 ;" user_uuid = DBGeneral.select_query(query_str) - updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name = 'el_for_test' WHERE uuid = '%s' ;" % ( - user_uuid) + updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\ + " = 'el_for_test' WHERE uuid = '%s' ;" % ( + user_uuid) DBGeneral.update_query(updatequery) - updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE full_name = '%s' ;" % ( - 'el_for_test') + updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\ + "full_name = '%s' ;" % ( + 'el_for_test') DBGeneral.update_query(updatequery) return 'el_for_test' @staticmethod def rollback_for_el_not_in_engagement(): - query_str = "select uuid from ice_user_profile where full_name = 'el_for_test';" + query_str = "select uuid from ice_user_profile where full_name = " +\ + "'el_for_test';" user_uuid = DBGeneral.select_query(query_str) fullName = DBBridge.helper_rand_string("randomString") - updatequery = "UPDATE ice_user_profile SET role_id=1,full_name = '%s' WHERE uuid = '%s' ;" % ( - fullName, user_uuid) + updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\ + "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid) DBGeneral.update_query(updatequery) @staticmethod def set_engagement_peer_reviewer(engagement_uuid, email): user_uuid = DBUser.select_user_uuid(email) - update_query = "UPDATE ice_user_profile SET role_id=2 WHERE uuid = '%s';" % user_uuid + update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\ + "uuid = '%s';" % user_uuid DBGeneral.update_query(update_query) user_id = DBGeneral.select_query( "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid) - update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s WHERE uuid = '%s';" % ( - user_id, engagement_uuid) + update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\ + "WHERE uuid = '%s';" % ( + user_id, engagement_uuid) DBGeneral.update_query(update_query) @staticmethod def select_user_profile_property(user_email, property_name): - return DBGeneral.select_where(property_name, "ice_user_profile", "email", user_email, 1) + return DBGeneral.select_where( + property_name, + "ice_user_profile", + "email", + user_email, + 1) @staticmethod def validate_user_profile_settings_in_db(user_email, checked): @@ -391,13 +469,23 @@ class DBUser: def get_access_key(user_uuid): counter = 0 access_key = DBGeneral.select_where( - "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) - while access_key == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER: + "rgwa_access_key", + Constants.DBConstants.IceTables.USER_PROFILE, + "uuid", + user_uuid, + 1) + while access_key == "None" and counter <= \ + Constants.RGWAConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause) logger.debug( - "rgwa_access_key are not ready yet, trying again (%s of 20)" % counter) + "rgwa_access_key are not ready yet, trying again (%s of 20)" % + counter) access_key = DBGeneral.select_where( - "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) + "rgwa_access_key", + Constants.DBConstants.IceTables.USER_PROFILE, + "uuid", + user_uuid, + 1) counter += 1 return access_key @@ -405,13 +493,23 @@ class DBUser: def get_access_secret(user_uuid): counter = 0 access_secret = DBGeneral.select_where( - "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) - while access_secret == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER: + "rgwa_secret_key", + Constants.DBConstants.IceTables.USER_PROFILE, + "uuid", + user_uuid, + 1) + while access_secret == "None" and counter <= Constants.\ + RGWAConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause) logger.debug( - "rgwa_secret_key are not ready yet, trying again (%s of 100)" % counter) + "rgwa_secret_key are not ready yet, trying again (%s of 100)" % + counter) access_secret = DBGeneral.select_where( - "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) - + "rgwa_secret_key", + Constants.DBConstants.IceTables.USER_PROFILE, + "uuid", + user_uuid, + 1) + counter += 1 return access_secret |