diff options
Diffstat (limited to 'services/database/db_user.py')
-rw-r--r-- | services/database/db_user.py | 515 |
1 files changed, 0 insertions, 515 deletions
diff --git a/services/database/db_user.py b/services/database/db_user.py deleted file mode 100644 index 10d02ff..0000000 --- a/services/database/db_user.py +++ /dev/null @@ -1,515 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import time - -from django.conf import settings -import psycopg2 - -from services.constants import Constants -from services.database.db_bridge import DBBridge -from services.database.db_general import DBGeneral -from services.database.db_virtual_function import DBVirtualFunction -from services.frontend.base_actions.wait import Wait -from services.logging_service import LoggingServiceFactory -from services.session import session - - -logger = LoggingServiceFactory.get_logger() - - -class DBUser: - - @staticmethod - def get_activation_url(email): - # Fetch one user ID. - uuid = DBUser.select_user_uuid(email) - # Fetch one user ID. - index = DBGeneral.select_where_email("id", "auth_user", email) - activation_token = DBGeneral.select_where( - "activation_token", "ice_custom_user", "user_ptr_id", index, 1) - # / activate /:userID /:token - activationUrl = settings.ICE_PORTAL_URL + '/#/activate/' + \ - str(uuid) + '/' + str(activation_token) - logger.debug("activationUrl :" + activationUrl) - return activationUrl - - @staticmethod - def get_contact_signup_url( - invite_token, - uuid, - email, - fullName, - phoneNum, - companyName): - companyId = DBGeneral.select_where( - "uuid", "ice_vendor", "name", companyName, 1) - signUpURLforContact = settings.ICE_PORTAL_URL + \ - "#/signUp?invitation=" + invite_token + \ - "&email=" + email + "&full_name=" + fullName + \ - "&phone_number=" + phoneNum + "&company=" + companyId - logger.debug("SignUpURLforContact :" + signUpURLforContact) - return signUpURLforContact - - @staticmethod - def select_invitation_token( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - email, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = \ - "select %s from %s Where %s = '%s' and email = '%s' ;" % ( - queryColumnName, queryTableName, whereParametrType, - whereParametrValue, email) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - if result is None: - errorMsg = "select_where_pr_state FAILED " - logger.error(errorMsg) - raise - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where FAILED " - raise Exception(errorMsg, "select_where") - - @staticmethod - def get_el_name(vfName): - try: - # Fetch one AT&T user ID. - engagement_id = DBVirtualFunction.select_eng_uuid(vfName) - engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", - engagement_id, 1) - reviewer_id = DBGeneral.select_where( - "reviewer_id", - "ice_engagement", - "engagement_manual_id", - engagement_manual_id, - 1) - engLeadFullName = DBGeneral.select_where_and( - "full_name", "ice_user_profile", "id", reviewer_id, - "role_id", "2", 1) - return engLeadFullName - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "get_el_name FAILED " - raise Exception(errorMsg, "get_el_name") - - @staticmethod - def get_email_by_full_name(fullname): - # try: - query_str = "select email from ice_user_profile where " +\ - "full_name = '%s';" % (fullname) - user_email = DBGeneral.select_query(query_str) - return user_email - - @staticmethod - def select_recent_vf_of_user(user_uuid, fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\ - "where user_uuid = '%s' order by last_update " % user_uuid +\ - "desc limit 20;" - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where FAILED " - raise Exception(errorMsg, "select_where") - - @staticmethod - def select_el_email(vfName): - try: - # Fetch one AT&T user ID. - engagement_id = DBVirtualFunction.select_eng_uuid(vfName) - engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", - engagement_id, 1) - reviewer_id = DBGeneral.select_where( - "reviewer_id", - "ice_engagement", - "engagement_manual_id", - engagement_manual_id, - 1) - engLeadEmail = DBGeneral.select_where_and( - "email", "ice_user_profile", "id", reviewer_id, "role_id", - "2", 1) - return engLeadEmail - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_el_email FAILED " - raise Exception(errorMsg, "select_el_email") - - @staticmethod - def select_user_native_id(email): - try: - # Fetch one AT&T user ID. - engLeadId = DBUser.select_user_profile_property(email, "id") - return engLeadId - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_user_native_id FAILED " - raise Exception(errorMsg, "select_user_native_id") - - @staticmethod - def select_personal_next_step(email): - user_id = DBUser.select_user_native_id(email) - return DBGeneral.select_where( - "uuid", "ice_next_step", "owner_id", user_id, 1) - - @staticmethod - def select_pr_email(vfName): - try: - # Fetch one AT&T user ID. - engagement_id = DBVirtualFunction.select_eng_uuid(vfName) - engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", - engagement_id, 1) - reviewer_id = DBGeneral.select_where( - "peer_reviewer_id", - "ice_engagement", - "engagement_manual_id", - engagement_manual_id, - 1) - engLeadEmail = DBGeneral.select_where( - "email", "ice_user_profile", "id", reviewer_id, 1) - return engLeadEmail - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_el_email FAILED " - raise Exception(errorMsg, "select_el_email") - - @staticmethod - def get_notification_id_by_email(userEmail): - uuid = DBGeneral.select_where_email( - "id", "ice_user_profile", userEmail) - notifIDs = DBGeneral.select_where( - "uuid", "ice_notification", "user_id", uuid, 0) - return notifIDs - - @staticmethod - def get_not_seen_notifications_number_by_email( - user_email, is_negative=False): - user_id = DBGeneral.select_where_email( - "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email) - notifications_number = DBGeneral.select_where_and( - Constants.DBConstants.Queries.COUNT, - Constants.DBConstants.IceTables.NOTIFICATION, - "user_id", - user_id, - "is_read", - "False", - 1) - if is_negative: - counter = 0 - while notifications_number != "0" and counter <= Constants.\ - Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER: - notifications_number = DBGeneral.select_where_and( - Constants.DBConstants.Queries.COUNT, - Constants.DBConstants.IceTables.NOTIFICATION, - "user_id", - user_id, - "is_read", - "False", - 1) - time.sleep(1) - counter += 1 - return notifications_number - - @staticmethod - def get_eng_lead_email_per_enguuid(enguuid): - reviewer_id = DBGeneral.select_where( - "reviewer_id", - Constants.DBConstants.IceTables.ENGAGEMENT, - "uuid", - enguuid, - 1) - engLeadEmail = DBGeneral.select_where( - "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", - reviewer_id, 1) - return engLeadEmail - - @staticmethod - def select_all_user_engagements(engLeadID): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\ - " Where iceuserprofile_id = %s" % engLeadID +\ - " and (select " +\ - "engagement_stage from public.ice_engagement " +\ - "where uuid = engagement_id LIMIT 1) != 'Archived';" - - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - result = cur.fetchall() - dbConn.close() - logger.debug("Query result: " + str(result)) - logger.debug(result[0][0]) - return result[0][0] - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_user_engagements_by_stage FAILED " - raise Exception(errorMsg, "select_user_engagements_by_stage") - - @staticmethod - def select_user_engagements_by_stage(stage, engLeadID): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "select count(*) from ice_engagement INNER JOIN " +\ - "ice_engagement_engagement_team ON " +\ - "ice_engagement_engagement_team.engagement_id= " +\ - "ice_engagement.uuid Where " +\ - "(ice_engagement.engagement_stage " +\ - "= '%s') and " % stage +\ - "(ice_engagement_engagement_team.iceuserprofile_id = " +\ - "%s );" % engLeadID - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - result = cur.fetchall() - dbConn.close() - logger.debug("Query result: " + str(result)) - logger.debug(result[0][0]) - return result[0][0] - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_user_engagements_by_stage FAILED " - raise Exception(errorMsg, "select_user_engagements_by_stage") - - @staticmethod - def set_new_temp_password(email): - encodePass = DBGeneral.select_where_email( - "password", "auth_user", Constants.Users.Admin.EMAIL) - # Fetch one user ID. - index = DBGeneral.select_where_email("id", "auth_user", email) - DBGeneral.update_where( - "ice_custom_user", - "temp_password", - encodePass, - "user_ptr_id", - index) - - @staticmethod - def set_password_to_default(email): - encodePass = DBGeneral.select_where_email( - "password", "auth_user", Constants.Users.Admin.EMAIL) - DBGeneral.update_where( - "auth_user", "password", encodePass, "email", email) - - @staticmethod - def select_el_not_in_engagement(el_name, pr_name): - query_str = "select full_name from ice_user_profile where " +\ - "role_id = 2 and full_name != '%s' and full_name != '%s';" % ( - el_name, pr_name) - new_user = DBGeneral.select_query(query_str) - if new_user == 'None': - new_user = DBUser.update_to_el_not_in_engagement() - return new_user - - @staticmethod - def select_user_uuid(email): - user_uuid = DBUser.select_user_profile_property(email, "uuid") - return user_uuid - - @staticmethod - def select_access_key(email): - access_key = DBUser.select_user_profile_property( - email, "rgwa_access_key") - return access_key - - @staticmethod - def select_secret_key(email): - secret_key = DBUser.select_user_profile_property( - email, "rgwa_secret_key") - return secret_key - - @staticmethod - def update_to_el_not_in_engagement(): - query_str = "select uuid from ice_user_profile where role_id = 1 ;" - user_uuid = DBGeneral.select_query(query_str) - updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\ - " = 'el_for_test' WHERE uuid = '%s' ;" % ( - user_uuid) - DBGeneral.update_query(updatequery) - updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\ - "full_name = '%s' ;" % ( - 'el_for_test') - DBGeneral.update_query(updatequery) - return 'el_for_test' - - @staticmethod - def rollback_for_el_not_in_engagement(): - query_str = "select uuid from ice_user_profile where full_name = " +\ - "'el_for_test';" - user_uuid = DBGeneral.select_query(query_str) - fullName = DBBridge.helper_rand_string("randomString") - updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\ - "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid) - DBGeneral.update_query(updatequery) - - @staticmethod - def set_engagement_peer_reviewer(engagement_uuid, email): - user_uuid = DBUser.select_user_uuid(email) - update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\ - "uuid = '%s';" % user_uuid - DBGeneral.update_query(update_query) - - user_id = DBGeneral.select_query( - "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid) - update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\ - "WHERE uuid = '%s';" % ( - user_id, engagement_uuid) - DBGeneral.update_query(update_query) - - @staticmethod - def select_user_profile_property(user_email, property_name): - return DBGeneral.select_where( - property_name, - "ice_user_profile", - "email", - user_email, - 1) - - @staticmethod - def validate_user_profile_settings_in_db(user_email, checked): - Wait.page_has_loaded() - regular_email_updates = DBUser.select_user_profile_property( - user_email, 'regular_email_updates') - DBBridge.helper_internal_assert(regular_email_updates, checked) - email_updates_on_every_notification = \ - DBUser.select_user_profile_property( - user_email, 'email_updates_on_every_notification') - DBBridge.helper_internal_assert( - email_updates_on_every_notification, checked) - email_updates_daily_digest = DBUser.select_user_profile_property( - user_email, 'email_updates_daily_digest') - DBBridge.helper_internal_assert( - email_updates_daily_digest, not checked) - - @staticmethod - def retrieve_admin_ssh_from_db(): - ssh_key = DBGeneral.select_where( - 'ssh_public_key', Constants.DBConstants.IceTables.USER_PROFILE, - 'email', Constants.Users.Admin.EMAIL, 1) - return ssh_key - - @staticmethod - def get_access_key(user_uuid): - counter = 0 - access_key = DBGeneral.select_where( - "rgwa_access_key", - Constants.DBConstants.IceTables.USER_PROFILE, - "uuid", - user_uuid, - 1) - while access_key == "None" and counter <= \ - Constants.RGWAConstants.RETRIES_NUMBER: - time.sleep(session.wait_until_time_pause) - logger.debug( - "rgwa_access_key are not ready yet, trying again (%s of 20)" % - counter) - access_key = DBGeneral.select_where( - "rgwa_access_key", - Constants.DBConstants.IceTables.USER_PROFILE, - "uuid", - user_uuid, - 1) - counter += 1 - return access_key - - @staticmethod - def get_access_secret(user_uuid): - counter = 0 - access_secret = DBGeneral.select_where( - "rgwa_secret_key", - Constants.DBConstants.IceTables.USER_PROFILE, - "uuid", - user_uuid, - 1) - while access_secret == "None" and counter <= Constants.\ - RGWAConstants.RETRIES_NUMBER: - time.sleep(session.wait_until_time_pause) - logger.debug( - "rgwa_secret_key are not ready yet, trying again (%s of 100)" % - counter) - access_secret = DBGeneral.select_where( - "rgwa_secret_key", - Constants.DBConstants.IceTables.USER_PROFILE, - "uuid", - user_uuid, - 1) - - counter += 1 - return access_secret |