aboutsummaryrefslogtreecommitdiffstats
path: root/services/database/db_user.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/database/db_user.py')
-rw-r--r--services/database/db_user.py515
1 files changed, 0 insertions, 515 deletions
diff --git a/services/database/db_user.py b/services/database/db_user.py
deleted file mode 100644
index 10d02ff..0000000
--- a/services/database/db_user.py
+++ /dev/null
@@ -1,515 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import time
-
-from django.conf import settings
-import psycopg2
-
-from services.constants import Constants
-from services.database.db_bridge import DBBridge
-from services.database.db_general import DBGeneral
-from services.database.db_virtual_function import DBVirtualFunction
-from services.frontend.base_actions.wait import Wait
-from services.logging_service import LoggingServiceFactory
-from services.session import session
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class DBUser:
-
- @staticmethod
- def get_activation_url(email):
- # Fetch one user ID.
- uuid = DBUser.select_user_uuid(email)
- # Fetch one user ID.
- index = DBGeneral.select_where_email("id", "auth_user", email)
- activation_token = DBGeneral.select_where(
- "activation_token", "ice_custom_user", "user_ptr_id", index, 1)
- # / activate /:userID /:token
- activationUrl = settings.ICE_PORTAL_URL + '/#/activate/' + \
- str(uuid) + '/' + str(activation_token)
- logger.debug("activationUrl :" + activationUrl)
- return activationUrl
-
- @staticmethod
- def get_contact_signup_url(
- invite_token,
- uuid,
- email,
- fullName,
- phoneNum,
- companyName):
- companyId = DBGeneral.select_where(
- "uuid", "ice_vendor", "name", companyName, 1)
- signUpURLforContact = settings.ICE_PORTAL_URL + \
- "#/signUp?invitation=" + invite_token + \
- "&email=" + email + "&full_name=" + fullName + \
- "&phone_number=" + phoneNum + "&company=" + companyId
- logger.debug("SignUpURLforContact :" + signUpURLforContact)
- return signUpURLforContact
-
- @staticmethod
- def select_invitation_token(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- email,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s Where %s = '%s' and email = '%s' ;" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue, email)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- if result is None:
- errorMsg = "select_where_pr_state FAILED "
- logger.error(errorMsg)
- raise
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def get_el_name(vfName):
- try:
- # Fetch one AT&T user ID.
- engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
- engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid",
- engagement_id, 1)
- reviewer_id = DBGeneral.select_where(
- "reviewer_id",
- "ice_engagement",
- "engagement_manual_id",
- engagement_manual_id,
- 1)
- engLeadFullName = DBGeneral.select_where_and(
- "full_name", "ice_user_profile", "id", reviewer_id,
- "role_id", "2", 1)
- return engLeadFullName
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "get_el_name FAILED "
- raise Exception(errorMsg, "get_el_name")
-
- @staticmethod
- def get_email_by_full_name(fullname):
- # try:
- query_str = "select email from ice_user_profile where " +\
- "full_name = '%s';" % (fullname)
- user_email = DBGeneral.select_query(query_str)
- return user_email
-
- @staticmethod
- def select_recent_vf_of_user(user_uuid, fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\
- "where user_uuid = '%s' order by last_update " % user_uuid +\
- "desc limit 20;"
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def select_el_email(vfName):
- try:
- # Fetch one AT&T user ID.
- engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
- engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid",
- engagement_id, 1)
- reviewer_id = DBGeneral.select_where(
- "reviewer_id",
- "ice_engagement",
- "engagement_manual_id",
- engagement_manual_id,
- 1)
- engLeadEmail = DBGeneral.select_where_and(
- "email", "ice_user_profile", "id", reviewer_id, "role_id",
- "2", 1)
- return engLeadEmail
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_el_email FAILED "
- raise Exception(errorMsg, "select_el_email")
-
- @staticmethod
- def select_user_native_id(email):
- try:
- # Fetch one AT&T user ID.
- engLeadId = DBUser.select_user_profile_property(email, "id")
- return engLeadId
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_user_native_id FAILED "
- raise Exception(errorMsg, "select_user_native_id")
-
- @staticmethod
- def select_personal_next_step(email):
- user_id = DBUser.select_user_native_id(email)
- return DBGeneral.select_where(
- "uuid", "ice_next_step", "owner_id", user_id, 1)
-
- @staticmethod
- def select_pr_email(vfName):
- try:
- # Fetch one AT&T user ID.
- engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
- engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid",
- engagement_id, 1)
- reviewer_id = DBGeneral.select_where(
- "peer_reviewer_id",
- "ice_engagement",
- "engagement_manual_id",
- engagement_manual_id,
- 1)
- engLeadEmail = DBGeneral.select_where(
- "email", "ice_user_profile", "id", reviewer_id, 1)
- return engLeadEmail
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_el_email FAILED "
- raise Exception(errorMsg, "select_el_email")
-
- @staticmethod
- def get_notification_id_by_email(userEmail):
- uuid = DBGeneral.select_where_email(
- "id", "ice_user_profile", userEmail)
- notifIDs = DBGeneral.select_where(
- "uuid", "ice_notification", "user_id", uuid, 0)
- return notifIDs
-
- @staticmethod
- def get_not_seen_notifications_number_by_email(
- user_email, is_negative=False):
- user_id = DBGeneral.select_where_email(
- "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email)
- notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT,
- Constants.DBConstants.IceTables.NOTIFICATION,
- "user_id",
- user_id,
- "is_read",
- "False",
- 1)
- if is_negative:
- counter = 0
- while notifications_number != "0" and counter <= Constants.\
- Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
- notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT,
- Constants.DBConstants.IceTables.NOTIFICATION,
- "user_id",
- user_id,
- "is_read",
- "False",
- 1)
- time.sleep(1)
- counter += 1
- return notifications_number
-
- @staticmethod
- def get_eng_lead_email_per_enguuid(enguuid):
- reviewer_id = DBGeneral.select_where(
- "reviewer_id",
- Constants.DBConstants.IceTables.ENGAGEMENT,
- "uuid",
- enguuid,
- 1)
- engLeadEmail = DBGeneral.select_where(
- "email", Constants.DBConstants.IceTables.USER_PROFILE, "id",
- reviewer_id, 1)
- return engLeadEmail
-
- @staticmethod
- def select_all_user_engagements(engLeadID):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\
- " Where iceuserprofile_id = %s" % engLeadID +\
- " and (select " +\
- "engagement_stage from public.ice_engagement " +\
- "where uuid = engagement_id LIMIT 1) != 'Archived';"
-
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = cur.fetchall()
- dbConn.close()
- logger.debug("Query result: " + str(result))
- logger.debug(result[0][0])
- return result[0][0]
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_user_engagements_by_stage FAILED "
- raise Exception(errorMsg, "select_user_engagements_by_stage")
-
- @staticmethod
- def select_user_engagements_by_stage(stage, engLeadID):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select count(*) from ice_engagement INNER JOIN " +\
- "ice_engagement_engagement_team ON " +\
- "ice_engagement_engagement_team.engagement_id= " +\
- "ice_engagement.uuid Where " +\
- "(ice_engagement.engagement_stage " +\
- "= '%s') and " % stage +\
- "(ice_engagement_engagement_team.iceuserprofile_id = " +\
- "%s );" % engLeadID
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = cur.fetchall()
- dbConn.close()
- logger.debug("Query result: " + str(result))
- logger.debug(result[0][0])
- return result[0][0]
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_user_engagements_by_stage FAILED "
- raise Exception(errorMsg, "select_user_engagements_by_stage")
-
- @staticmethod
- def set_new_temp_password(email):
- encodePass = DBGeneral.select_where_email(
- "password", "auth_user", Constants.Users.Admin.EMAIL)
- # Fetch one user ID.
- index = DBGeneral.select_where_email("id", "auth_user", email)
- DBGeneral.update_where(
- "ice_custom_user",
- "temp_password",
- encodePass,
- "user_ptr_id",
- index)
-
- @staticmethod
- def set_password_to_default(email):
- encodePass = DBGeneral.select_where_email(
- "password", "auth_user", Constants.Users.Admin.EMAIL)
- DBGeneral.update_where(
- "auth_user", "password", encodePass, "email", email)
-
- @staticmethod
- def select_el_not_in_engagement(el_name, pr_name):
- query_str = "select full_name from ice_user_profile where " +\
- "role_id = 2 and full_name != '%s' and full_name != '%s';" % (
- el_name, pr_name)
- new_user = DBGeneral.select_query(query_str)
- if new_user == 'None':
- new_user = DBUser.update_to_el_not_in_engagement()
- return new_user
-
- @staticmethod
- def select_user_uuid(email):
- user_uuid = DBUser.select_user_profile_property(email, "uuid")
- return user_uuid
-
- @staticmethod
- def select_access_key(email):
- access_key = DBUser.select_user_profile_property(
- email, "rgwa_access_key")
- return access_key
-
- @staticmethod
- def select_secret_key(email):
- secret_key = DBUser.select_user_profile_property(
- email, "rgwa_secret_key")
- return secret_key
-
- @staticmethod
- def update_to_el_not_in_engagement():
- query_str = "select uuid from ice_user_profile where role_id = 1 ;"
- user_uuid = DBGeneral.select_query(query_str)
- updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\
- " = 'el_for_test' WHERE uuid = '%s' ;" % (
- user_uuid)
- DBGeneral.update_query(updatequery)
- updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
- "full_name = '%s' ;" % (
- 'el_for_test')
- DBGeneral.update_query(updatequery)
- return 'el_for_test'
-
- @staticmethod
- def rollback_for_el_not_in_engagement():
- query_str = "select uuid from ice_user_profile where full_name = " +\
- "'el_for_test';"
- user_uuid = DBGeneral.select_query(query_str)
- fullName = DBBridge.helper_rand_string("randomString")
- updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\
- "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid)
- DBGeneral.update_query(updatequery)
-
- @staticmethod
- def set_engagement_peer_reviewer(engagement_uuid, email):
- user_uuid = DBUser.select_user_uuid(email)
- update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
- "uuid = '%s';" % user_uuid
- DBGeneral.update_query(update_query)
-
- user_id = DBGeneral.select_query(
- "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid)
- update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\
- "WHERE uuid = '%s';" % (
- user_id, engagement_uuid)
- DBGeneral.update_query(update_query)
-
- @staticmethod
- def select_user_profile_property(user_email, property_name):
- return DBGeneral.select_where(
- property_name,
- "ice_user_profile",
- "email",
- user_email,
- 1)
-
- @staticmethod
- def validate_user_profile_settings_in_db(user_email, checked):
- Wait.page_has_loaded()
- regular_email_updates = DBUser.select_user_profile_property(
- user_email, 'regular_email_updates')
- DBBridge.helper_internal_assert(regular_email_updates, checked)
- email_updates_on_every_notification = \
- DBUser.select_user_profile_property(
- user_email, 'email_updates_on_every_notification')
- DBBridge.helper_internal_assert(
- email_updates_on_every_notification, checked)
- email_updates_daily_digest = DBUser.select_user_profile_property(
- user_email, 'email_updates_daily_digest')
- DBBridge.helper_internal_assert(
- email_updates_daily_digest, not checked)
-
- @staticmethod
- def retrieve_admin_ssh_from_db():
- ssh_key = DBGeneral.select_where(
- 'ssh_public_key', Constants.DBConstants.IceTables.USER_PROFILE,
- 'email', Constants.Users.Admin.EMAIL, 1)
- return ssh_key
-
- @staticmethod
- def get_access_key(user_uuid):
- counter = 0
- access_key = DBGeneral.select_where(
- "rgwa_access_key",
- Constants.DBConstants.IceTables.USER_PROFILE,
- "uuid",
- user_uuid,
- 1)
- while access_key == "None" and counter <= \
- Constants.RGWAConstants.RETRIES_NUMBER:
- time.sleep(session.wait_until_time_pause)
- logger.debug(
- "rgwa_access_key are not ready yet, trying again (%s of 20)" %
- counter)
- access_key = DBGeneral.select_where(
- "rgwa_access_key",
- Constants.DBConstants.IceTables.USER_PROFILE,
- "uuid",
- user_uuid,
- 1)
- counter += 1
- return access_key
-
- @staticmethod
- def get_access_secret(user_uuid):
- counter = 0
- access_secret = DBGeneral.select_where(
- "rgwa_secret_key",
- Constants.DBConstants.IceTables.USER_PROFILE,
- "uuid",
- user_uuid,
- 1)
- while access_secret == "None" and counter <= Constants.\
- RGWAConstants.RETRIES_NUMBER:
- time.sleep(session.wait_until_time_pause)
- logger.debug(
- "rgwa_secret_key are not ready yet, trying again (%s of 100)" %
- counter)
- access_secret = DBGeneral.select_where(
- "rgwa_secret_key",
- Constants.DBConstants.IceTables.USER_PROFILE,
- "uuid",
- user_uuid,
- 1)
-
- counter += 1
- return access_secret