aboutsummaryrefslogtreecommitdiffstats
path: root/services/database/db_user.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/database/db_user.py')
-rw-r--r--services/database/db_user.py417
1 files changed, 417 insertions, 0 deletions
diff --git a/services/database/db_user.py b/services/database/db_user.py
new file mode 100644
index 0000000..d347dd2
--- /dev/null
+++ b/services/database/db_user.py
@@ -0,0 +1,417 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import time
+
+from django.conf import settings
+import psycopg2
+
+from services.constants import Constants
+from services.database.db_bridge import DBBridge
+from services.database.db_general import DBGeneral
+from services.database.db_virtual_function import DBVirtualFunction
+from services.frontend.base_actions.wait import Wait
+from services.logging_service import LoggingServiceFactory
+from services.session import session
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class DBUser:
+
+ @staticmethod
+ def get_activation_url(email):
+ # Fetch one user ID.
+ uuid = DBUser.select_user_uuid(email)
+ # Fetch one user ID.
+ index = DBGeneral.select_where_email("id", "auth_user", email)
+ activation_token = DBGeneral.select_where(
+ "activation_token", "ice_custom_user", "user_ptr_id", index, 1)
+ # / activate /:userID /:token
+ activationUrl = settings.ICE_PORTAL_URL + '/#/activate/' + \
+ str(uuid) + '/' + str(activation_token)
+ logger.debug("activationUrl :" + activationUrl)
+ return activationUrl
+
+ @staticmethod
+ def get_contact_signup_url(invite_token, uuid, email, fullName, phoneNum, companyName):
+ companyId = DBGeneral.select_where(
+ "uuid", "ice_vendor", "name", companyName, 1)
+ signUpURLforContact = settings.ICE_PORTAL_URL + "#/signUp?invitation=" + invite_token + \
+ "&email=" + email + "&full_name=" + fullName + \
+ "&phone_number=" + phoneNum + "&company=" + companyId
+ logger.debug("SignUpURLforContact :" + signUpURLforContact)
+ return signUpURLforContact
+
+ @staticmethod
+ def select_invitation_token(queryColumnName, queryTableName, whereParametrType, whereParametrValue, email, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and email = '%s' ;" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue, email)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ if result == None:
+ errorMsg = "select_where_pr_state FAILED "
+ logger.error(errorMsg)
+ raise
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def get_el_name(vfName):
+ try:
+ # Fetch one AT&T user ID.
+ engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ reviewer_id = DBGeneral.select_where(
+ "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ engLeadFullName = DBGeneral.select_where_and(
+ "full_name", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ return engLeadFullName
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "get_el_name FAILED "
+ raise Exception(errorMsg, "get_el_name")
+
+ @staticmethod
+ def get_email_by_full_name(fullname):
+ # try:
+ query_str = "select email from ice_user_profile where full_name = '%s';" % (
+ fullname)
+ user_email = DBGeneral.select_query(query_str)
+ return user_email
+# except: # If failed - count the failure and add the error to list of errors.
+# errorMsg = "get_email_by_full_name FAILED "
+# raise Exception(errorMsg, "get_el_name")
+
+ @staticmethod
+ def select_recent_vf_of_user(user_uuid, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "SELECT vf_id FROM public.ice_recent_engagement where user_uuid = '%s' order by last_update desc limit 20;" % (
+ user_uuid)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def select_el_email(vfName):
+ try:
+ # Fetch one AT&T user ID.
+ engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ reviewer_id = DBGeneral.select_where(
+ "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ engLeadEmail = DBGeneral.select_where_and(
+ "email", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ return engLeadEmail
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_el_email FAILED "
+ raise Exception(errorMsg, "select_el_email")
+
+ @staticmethod
+ def select_user_native_id(email):
+ try:
+ # Fetch one AT&T user ID.
+ engLeadId = DBUser.select_user_profile_property(email, "id")
+ return engLeadId
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_user_native_id FAILED "
+ raise Exception(errorMsg, "select_user_native_id")
+
+ @staticmethod
+ def select_personal_next_step(email):
+ user_id = DBUser.select_user_native_id(email)
+ return DBGeneral.select_where("uuid", "ice_next_step", "owner_id", user_id, 1)
+
+ @staticmethod
+ def select_pr_email(vfName):
+ try:
+ # Fetch one AT&T user ID.
+ engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ reviewer_id = DBGeneral.select_where(
+ "peer_reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ engLeadEmail = DBGeneral.select_where(
+ "email", "ice_user_profile", "id", reviewer_id, 1)
+ return engLeadEmail
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_el_email FAILED "
+ raise Exception(errorMsg, "select_el_email")
+
+ @staticmethod
+ def get_notification_id_by_email(userEmail):
+ uuid = DBGeneral.select_where_email(
+ "id", "ice_user_profile", userEmail)
+ notifIDs = DBGeneral.select_where(
+ "uuid", "ice_notification", "user_id", uuid, 0)
+ return notifIDs
+
+ @staticmethod
+ def get_not_seen_notifications_number_by_email(user_email, is_negative=False):
+ user_id = DBGeneral.select_where_email(
+ "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email)
+ notifications_number = DBGeneral.select_where_and(
+ Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ if is_negative:
+ counter = 0
+ while notifications_number != "0" and counter <= Constants.Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
+ notifications_number = DBGeneral.select_where_and(
+ Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ time.sleep(1)
+ counter += 1
+ return notifications_number
+
+ @staticmethod
+ def get_eng_lead_email_per_enguuid(enguuid):
+ reviewer_id = DBGeneral.select_where(
+ "reviewer_id", Constants.DBConstants.IceTables.ENGAGEMENT, "uuid", enguuid, 1)
+ engLeadEmail = DBGeneral.select_where(
+ "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", reviewer_id, 1)
+ return engLeadEmail
+
+ @staticmethod
+ def select_all_user_engagements(engLeadID):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select COUNT(*) from ice_engagement_engagement_team Where iceuserprofile_id = %s and (select engagement_stage from public.ice_engagement where uuid = engagement_id LIMIT 1) != 'Archived';" % (
+ engLeadID)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = cur.fetchall()
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ logger.debug(result[0][0])
+ return result[0][0]
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_user_engagements_by_stage FAILED "
+ raise Exception(errorMsg, "select_user_engagements_by_stage")
+
+ @staticmethod
+ def select_user_engagements_by_stage(stage, engLeadID):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select count(*) from ice_engagement INNER JOIN ice_engagement_engagement_team ON ice_engagement_engagement_team.engagement_id= ice_engagement.uuid Where (ice_engagement.engagement_stage = '%s') and (ice_engagement_engagement_team.iceuserprofile_id = %s );" % (
+ stage, engLeadID)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = cur.fetchall()
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ logger.debug(result[0][0])
+ return result[0][0]
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_user_engagements_by_stage FAILED "
+ raise Exception(errorMsg, "select_user_engagements_by_stage")
+
+ @staticmethod
+ def set_new_temp_password(email):
+ encodePass = DBGeneral.select_where_email(
+ "password", "auth_user", Constants.Users.Admin.EMAIL)
+ # Fetch one user ID.
+ index = DBGeneral.select_where_email("id", "auth_user", email)
+ DBGeneral.update_where(
+ "ice_custom_user", "temp_password", encodePass, "user_ptr_id", index)
+
+ @staticmethod
+ def set_password_to_default(email):
+ encodePass = DBGeneral.select_where_email(
+ "password", "auth_user", Constants.Users.Admin.EMAIL)
+ DBGeneral.update_where(
+ "auth_user", "password", encodePass, "email", email)
+
+ @staticmethod
+ def select_el_not_in_engagement(el_name, pr_name):
+ query_str = "select full_name from ice_user_profile where role_id = 2 and full_name != '%s' and full_name != '%s';" % (
+ el_name, pr_name)
+ new_user = DBGeneral.select_query(query_str)
+ if new_user == 'None':
+ new_user = DBUser.update_to_el_not_in_engagement()
+ return new_user
+
+ @staticmethod
+ def select_user_uuid(email):
+ user_uuid = DBUser.select_user_profile_property(email, "uuid")
+ return user_uuid
+
+ @staticmethod
+ def select_access_key(email):
+ access_key = DBUser.select_user_profile_property(email, "rgwa_access_key")
+ return access_key
+
+ @staticmethod
+ def select_secret_key(email):
+ secret_key = DBUser.select_user_profile_property(email, "rgwa_secret_key")
+ return secret_key
+
+ @staticmethod
+ def update_to_el_not_in_engagement():
+ query_str = "select uuid from ice_user_profile where role_id = 1 ;"
+ user_uuid = DBGeneral.select_query(query_str)
+ updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name = 'el_for_test' WHERE uuid = '%s' ;" % (
+ user_uuid)
+ DBGeneral.update_query(updatequery)
+ updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE full_name = '%s' ;" % (
+ 'el_for_test')
+ DBGeneral.update_query(updatequery)
+ return 'el_for_test'
+
+ @staticmethod
+ def rollback_for_el_not_in_engagement():
+ query_str = "select uuid from ice_user_profile where full_name = 'el_for_test';"
+ user_uuid = DBGeneral.select_query(query_str)
+ fullName = DBBridge.helper_rand_string("randomString")
+ updatequery = "UPDATE ice_user_profile SET role_id=1,full_name = '%s' WHERE uuid = '%s' ;" % (
+ fullName, user_uuid)
+ DBGeneral.update_query(updatequery)
+
+ @staticmethod
+ def set_engagement_peer_reviewer(engagement_uuid, email):
+ user_uuid = DBUser.select_user_uuid(email)
+ update_query = "UPDATE ice_user_profile SET role_id=2 WHERE uuid = '%s';" % user_uuid
+ DBGeneral.update_query(update_query)
+
+ user_id = DBGeneral.select_query(
+ "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid)
+ update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s WHERE uuid = '%s';" % (
+ user_id, engagement_uuid)
+ DBGeneral.update_query(update_query)
+
+ @staticmethod
+ def select_user_profile_property(user_email, property_name):
+ return DBGeneral.select_where(property_name, "ice_user_profile", "email", user_email, 1)
+
+ @staticmethod
+ def validate_user_profile_settings_in_db(user_email, checked):
+ Wait.page_has_loaded()
+ regular_email_updates = DBUser.select_user_profile_property(
+ user_email, 'regular_email_updates')
+ DBBridge.helper_internal_assert(regular_email_updates, checked)
+ email_updates_on_every_notification = \
+ DBUser.select_user_profile_property(
+ user_email, 'email_updates_on_every_notification')
+ DBBridge.helper_internal_assert(
+ email_updates_on_every_notification, checked)
+ email_updates_daily_digest = DBUser.select_user_profile_property(
+ user_email, 'email_updates_daily_digest')
+ DBBridge.helper_internal_assert(
+ email_updates_daily_digest, not checked)
+
+ @staticmethod
+ def retrieve_admin_ssh_from_db():
+ ssh_key = DBGeneral.select_where(
+ 'ssh_public_key', Constants.DBConstants.IceTables.USER_PROFILE,
+ 'email', Constants.Users.Admin.EMAIL, 1)
+ return ssh_key
+
+ @staticmethod
+ def get_access_key(user_uuid):
+ counter = 0
+ access_key = DBGeneral.select_where(
+ "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ while access_key == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ time.sleep(session.wait_until_time_pause)
+ logger.debug(
+ "rgwa_access_key are not ready yet, trying again (%s of 20)" % counter)
+ access_key = DBGeneral.select_where(
+ "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ counter += 1
+ return access_key
+
+ @staticmethod
+ def get_access_secret(user_uuid):
+ counter = 0
+ access_secret = DBGeneral.select_where(
+ "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ while access_secret == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ time.sleep(session.wait_until_time_pause)
+ logger.debug(
+ "rgwa_secret_key are not ready yet, trying again (%s of 100)" % counter)
+ access_secret = DBGeneral.select_where(
+ "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+
+ counter += 1
+ return access_secret